微信群信息采集_大数据_w_t_y_y的博客-CSDN博客

来源: 微信群信息采集_大数据_w_t_y_y的博客-CSDN博客

一、数据库:

1、微信账号表

CREATE TABLE `t_wechat_robot` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT ‘自增长id’,
`robot_id` varchar(255) DEFAULT NULL COMMENT ‘机器Id’,
`robot_name` varchar(255) DEFAULT NULL COMMENT ‘机器名称’,
`robot_nick_name` varchar(255) DEFAULT NULL COMMENT ‘机器微信昵称’,
`status` tinyint(1) DEFAULT NULL COMMENT ‘状态,0在线,1离线’,
`is_delete` tinyint(1) DEFAULT NULL COMMENT ‘删除标志,1 删除,0未删除’,
`create_time` timestamp NULL DEFAULT NULL COMMENT ‘创建时间’,
`update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT ‘更新时间’,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT=’微信群聊机器表’;
并手动新增一条我的微信号记录:

INSERT INTO `t_wechat_robot` VALUES (‘1’, ‘1’, ‘机器1’, MYWXNICKNAME, ‘0’, ‘0’, null, ‘2019-04-24 14:57:00’);
2、微信群表

CREATE TABLE `t_wechat_group` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT ‘自增长id’,
`robot_id` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT ‘机器id’,
`group_id` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT ‘群Id’,
`group_user_name` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT ‘群名’,
`group_nick_name` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
`head_img_url` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT ‘群头像’,
`member_count` int(11) DEFAULT ‘0’ COMMENT ‘群成员数’,
`owner_nick_name` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT ‘群主’,
`chat_room_id` int(11) DEFAULT NULL COMMENT ‘聊天室id’,
`create_time` timestamp NULL DEFAULT NULL COMMENT ‘创建时间’,
`update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT ‘更新时间’,
`is_delete` tinyint(1) DEFAULT NULL COMMENT ‘删除标志’,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=861 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT=’微信群’;
3、微信群成员表

CREATE TABLE `t_wechat_group_member` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT ‘自增长id’,
`group_id` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT ‘群id’,
`user_name` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT ‘用户名’,
`nick_name` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
`mem_status` int(11) DEFAULT NULL COMMENT ‘状态’,
`uin` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT ‘uin’,
`create_time` timestamp NULL DEFAULT NULL COMMENT ‘创建时间’,
`update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT ‘更新时间’,
`is_delete` tinyint(4) DEFAULT NULL COMMENT ‘删除标志,1 删除,0未删除’,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
mongo群消息表:

项目:

 

二、api

此处省略部分get和set方法:

1、PageParam:

package com.wtyy.common;
import java.io.Serializable;
/**
* 传统的分页参数
*/
public class PageParam implements Serializable {

private static final long serialVersionUID = -5552159160388121108L;
private static final int MIN_PAGE_SIZE = 1;
private static final int MAX_PAGE_SIZE = 1000;
private static final int DEFAULT_PAGE_SIZE = 10;
/**
* 当前页码,从1开始。
*/
private int pageIndex;

/**
* 每页记录数。
*/
private int pageSize = DEFAULT_PAGE_SIZE;

public PageParam(int pageIndex, int pageSize) {
setPageIndex(pageIndex);
setPageSize(pageSize);
}

/**
* 得到开始记录index。
*/
public int getStartIndex() {
return (pageIndex – 1) * pageSize;
}

public int getPageIndex() {
return pageIndex;
}

public void setPageIndex(int pageIndex) {
if (pageIndex < 1){
throw new RuntimeException(“page index should > 0”);
}
this.pageIndex = pageIndex;
}

public int getPageSize() {
return pageSize;
}

public void setPageSize(int pageSize) {
if (pageSize >= MIN_PAGE_SIZE && pageSize <= MAX_PAGE_SIZE){
this.pageSize = pageSize;
}else {
throw new RuntimeException(String.format(“page size should be ranged in [%s, %s]”,MIN_PAGE_SIZE,MAX_PAGE_SIZE));
}
}

}
2、Pager:

 

package com.wtyy.common;
import java.io.Serializable;
import java.util.List;

/**
* 传统分页
*/
public class Pager<T> implements Serializable {

private static final long serialVersionUID = -9134108412928477507L;

/**
* 总记录数。
*/
private int totalCount;

/**
* 当前页的记录列表。
*/
private List<T> list;

public Pager(int totalCount, List<T> list){
this.totalCount = totalCount;
this.list = list;
}
}
3、WechatMsgType微信群消息类型(整型):

package com.wtyy.constant;

/**
* @Description: 微信群消息类型
*/
public enum WechatMsgType {

TEXT(“文本”,1),

PICTURE(“图片”,3),

RECORDING(“语音”,34),

Video(“视频”,43);

/**
* @param typeName
* @param typeCode
*/
private WechatMsgType(String typeName, Integer typeCode) {
this.typeName = typeName;
this.typeCode = typeCode;
}

private String typeName;

private Integer typeCode;

public String getTypeName() {
return typeName;
}

public void setTypeName(String typeName) {
this.typeName = typeName;
}

public Integer getTypeCode() {
return typeCode;
}

public void setTypeCode(Integer typeCode) {
this.typeCode = typeCode;
}

}
4、WechatType微信群消息类型(字符型):

package com.wtyy.constant;

/**
* 微信群消息类型
*/
public enum WechatType {
//文本消息
Text,

//图片消息
Picture,

//语音消息
Recording,

//视频消息
Video
}
5、WechatRobot登录机器:

package com.wtyy.module;

import java.io.Serializable;
import java.util.Date;

public class WechatRobot implements Serializable{
/**
*
*/
private static final long serialVersionUID = -8066995470069020705L;

private Integer id;

private String robotId;

private String robotName;

private String robotNickName;

/**
* 状态,0在线,1离线
*/
private Short status;

private Short isDelete;

private Date createTime;

private Date updateTime;

}
6、WechatGroup微信群:

package com.wtyy.module;

import java.io.Serializable;
import java.util.Date;

public class WechatGroup implements Serializable{
/**
*
*/
private static final long serialVersionUID = -7954472287669464909L;

private Integer id;

private String robotId;

private String groupId;

private String groupUserName;

private String groupNickName;

private String headImgUrl;

private Integer memberCount;

private String ownerNickName;

private Integer chatRoomId;

private Date createTime;

private Date updateTime;

private Short isDelete;

}
7、WechatGroupMember微信群成员:

package com.wtyy.module;

import java.io.Serializable;
import java.util.Date;

public class WechatGroupMember implements Serializable{
/**
*
*/
private static final long serialVersionUID = 3937137164822326424L;

private Integer id;

private String groupId;

private String userName;

private String nickName;

private Integer memStatus;

private Long uin;

private Date createTime;

private Date updateTime;

private Short isDelete;

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result
+ ((nickName == null) ? 0 : nickName.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
WechatGroupMember other = (WechatGroupMember) obj;
if (nickName == null) {
if (other.nickName != null)
return false;
} else if (!nickName.equals(other.nickName))
return false;
return true;
}

}
8、WechatGroupMsg微信群消息:

package com.wtyy.module;

import java.io.Serializable;
import java.util.Date;
public class WechatGroupMsg implements Serializable{

private static final long serialVersionUID = -8707880890289759728L;

private String robotId;

private String groupId;

private String msgId;

private String fromUserName;

private String toUserName;

/**消息类型
* @see
* com.iflytek.edu.zx.table.admin.constant.WechatMsgType
*/
private Integer msgType;

/**
* 消息类型
* @see
* com.iflytek.edu.zx.table.admin.constant.WechatType
*/
private String type;

/**
* 文件路径
*/
private String url;

/**
* 消息内容
*/
private String content;

/**
* 消息内容
*/
private String text;

/**
* 文件消息 文件名称
*/
private String fileName;

private Integer status;

private Integer imgStatus;

private Date createTime;

/**
* 发送者昵称
*/
private String actualNickName;

private String actualUserName;

private Integer statusNotifyCode;

private String statusNotifyUserName;

RecommendInfo recommendInfo;

}
9、RecommendInfo:

package com.wtyy.module;
import java.io.Serializable;

public class RecommendInfo implements Serializable{
private static final long serialVersionUID = 6421951838838130816L;

private String userName;

private String nickName;

private String qQNum;

private String content;
}
10、WechatService微信群信息服务:

package com.wtyy.service;
import java.util.List;
import com.wtyy.common.PageParam;
import com.wtyy.common.Pager;
import com.wtyy.module.WechatGroup;
import com.wtyy.module.WechatGroupMember;
import com.wtyy.module.WechatRobot;

public interface WechatService {

/**
* 新增机器
*/
public void addWechatRobot(WechatRobot wechatRobot);

/**
* 修改机器
*/
public void updateWechatRobot(WechatRobot wechatRobot);

/**
* 删除机器
*/
public void deleteWechatRobot(Integer id);

/**
* 根据昵称获取robotId
*/
public String getRobotIdByNickName(String nickName);

/**
* 分页获取机器
*/
public Pager<WechatRobot> getPagerWechatRobot(PageParam pageParam);

/**
* 添加群
*/
public void addWechatGroup(WechatGroup wechatGroup);

/**
* 根据群昵称获取groupId
*/
public String getGroupIdByGroupNickName(String groupNickName);

/**
* 修改群
*/
public void updateWechatGroup(WechatGroup wechatGroup);

/**
* 分页获取群
*/
public Pager<WechatGroup> getPagerWechatGroup(PageParam pageParam,
String robotId);

/**
* 添加群成员
*/
public void batchInsertWechatGroupMember(List<WechatGroupMember> groupMembers,
String groupId);

/**
* 获取群成员列表,为null返回空集合[]
*/
public List<WechatGroupMember> getMembersByGroupId(String groupId);

/**
* 批量删除群成员
*/
public void batchDeleteWechatGroupMember(List<Integer> ids, String groupId);

/**
* 更新群成员数
*/
public void updateWechatGroupSize(String groupId, int size);

}
11、WechatGroupMsgService微信群消息服务:

package com.wtyy.service;
import com.wtyy.common.PageParam;
import com.wtyy.common.Pager;
import com.wtyy.module.WechatGroupMsg;

public interface WechatGroupMsgService {
/**
* 添加群消息
*/
public void addGroupMsg(WechatGroupMsg wechatGroupMsg);

/**
* 分页获取群消息
*/
public Pager<WechatGroupMsg> getPagerWechatGroupMsg(PageParam pageParam,
String robotId,String groupId);
}
三、service:

配置文件:

spring.datasource.driver-class-name=com.mySQL.jdbc.Driver
spring.datasource.url=jdbc:mySQL://localhost:3307/itchat?useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.username=root
spring.datasource.password=wtyy

mybatis.mapper-locations=classpath*:Mapper/*Mapper.xml
server.port=8081

1、Dao:

看下ConfigDAO:这是因为存在特殊符号和一些表情,utf-8存不了(会报错\xF0\x9F\x91\x8D05…),需要utf8mb4。具体可见我的博客 mybatis分类下的异常处理。

package com.wtyy.dao;

import org.apache.ibatis.annotations.Update;

public interface ConfigDAO {

@Update(“set names utf8mb4”)
public void setCharsetToUtf8mb4();
}
其他的dao都是对mySQL的增删改查,就不粘了。

2、impl:

1、看下mongo的dao,其他的都是对mysql表的增删改查,不粘了。

package com.wtyy.impl;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;

import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.wtyy.module.WechatGroupMsg;
import com.wtyy.util.MongodbManager;

/**
* @Description: mongo存储微信群消息
*/
@Repository(“wechatGroupMsgDao”)
public class WechatGroupMsgDao {

///collection名
private String wechatMsgMongoCollection =”wechatmsg”;

public void addGroupMsg(WechatGroupMsg wechatGroupMsg) {
DBCollection dbCollection = MongodbManager.DB_TL.getCollection(wechatMsgMongoCollection);
DBObject documents = new BasicDBObject();
documents.put(“robot_id”, wechatGroupMsg.getRobotId());
documents.put(“group_id”, wechatGroupMsg.getGroupId());
documents.put(“msg_id”, wechatGroupMsg.getMsgId());
documents.put(“from_user_name”, wechatGroupMsg.getFromUserName());
documents.put(“to_user_name”, wechatGroupMsg.getToUserName());
documents.put(“msg_type”, wechatGroupMsg.getMsgType());
documents.put(“type”, wechatGroupMsg.getType());
documents.put(“content”, wechatGroupMsg.getContent());
documents.put(“text”, wechatGroupMsg.getText());
documents.put(“status”, wechatGroupMsg.getStatus());
documents.put(“img_status”, wechatGroupMsg.getImgStatus());
documents.put(“create_time”, wechatGroupMsg.getCreateTime());
documents.put(“file_name”, wechatGroupMsg.getFileName());
documents.put(“actual_nick_name”, wechatGroupMsg.getActualNickName());
documents.put(“actual_user_name”, wechatGroupMsg.getActualUserName());
documents.put(“status_notify_code”, wechatGroupMsg.getStatusNotifyCode());
documents.put(“status_notify_user_name”, wechatGroupMsg.getStatusNotifyUserName());
documents.put(“url”, wechatGroupMsg.getUrl());
dbCollection.insert(documents );
}

}
3、util:

package com.wtyy.util;

import java.net.UnknownHostException;
import java.util.ArrayList;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import com.mongodb.DB;
import com.mongodb.MongoClient;
import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress;
import com.mongodb.WriteConcern;

/**
* mongodb管理器,提供DB对象。
*/
public class MongodbManager {

private static final Logger LOGGER = LoggerFactory.getLogger(“mongodbManager”);

private static MongoClient client;

//数据库名
private static String wechatMsgMongoDB = “wechatMsg”;

static {
String adressList = “localhost:27017”;

try {

String[] addresses = adressList.split(“,”);
ArrayList<ServerAddress> serverAddressList = new ArrayList<ServerAddress>();
for(int j=0; j < addresses.length; j++){
String[] address = addresses[j].split(“:”);
ServerAddress mongoAddress = new ServerAddress(address[0], Integer.parseInt(address[1]));
serverAddressList.add(mongoAddress);
}
client = new MongoClient(serverAddressList);
client.setReadPreference(ReadPreference.primaryPreferred());
LOGGER.info(“读写分离未启动”);

client.setWriteConcern(WriteConcern.SAFE);
LOGGER.info(“写入安全模式启动”);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}

public static final DB DB_TL = client.getDB(wechatMsgMongoDB);
}
4、启动类:

package com.wtyy;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ImportResource;

import com.alibaba.dubbo.config.spring.context.annotation.EnableDubboConfig;
import com.alibaba.dubbo.spring.boot.annotation.EnableDubboConfiguration;

@SpringBootApplication
@MapperScan(“com.wtyy.dao”)
@ImportResource(“classpath:provider.xml”)
public class ServiceStart {
public static void main(String[] args) {
SpringApplication.run(ServiceStart.class, args);
}

}
四、rest:

1、配置文件:

application.properties:

spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3307/itchat?useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.username=root
spring.datasource.password=wtyy

server.port=8082

spring.redis.host=localhost
spring.redis.port=6379
#spring.redis.password=
spring.redis.database=1
spring.redis.pool.max-active=8
spring.redis.pool.max-wait=-1
spring.redis.pool.max-idle=500
spring.redis.pool.min-idle=0
spring.redis.timeout=0

#kafka
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=localhost:9092
# 指定默认消费者group id
spring.kafka.consumer.group-id=myGroup
# 指定默认topic id
spring.kafka.template.default-topic= my-replicated-topic
# 指定listener 容器中的线程数,用于提高并发量
spring.kafka.listener.concurrency= 3
# 每次批量发送消息的数量
spring.kafka.producer.batch-size= 1000
#key-value序列化反序列化
#spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.buffer-memory=524288
#群消息
wechat.msg = wechatmsg
#群列表
wechat.group = wechatgroup
pom:

<project xmlns=”http://maven.apache.org/POM/4.0.0″ xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”
xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd”>
<modelVersion>4.0.0</modelVersion>
<groupId>com.wtyy</groupId>
<artifactId>itchat-rest</artifactId>
<version>0.0.1-SNAPSHOT</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath /> <!– lookup parent from repository –>
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.1.RELEASE</version>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.31</version>
</dependency>
<!– api –>
<dependency>
<groupId>com.wtyy</groupId>
<artifactId>itchat-api</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>

<!– mybatis –>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.21</version>
</dependency>

<!– dubbo –>
<dependency>
<groupId>com.alibaba.spring.boot</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>

<!– redis –>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>

</project>
2、dto:是微信消息的原格式,这里封装成对象,后面可以直接用json转成dto,由于转成的dto首字母全部是大写,所以存储的时候又按照阿里的开发规范转换了一次。至于dto的封装,看消息的原格式就很清晰了:

{
“MsgId”: “5689968435146263788”,
“FromUserName”: “@863e84f7210f6ca0da17b018aedf81bcf8a173fee1d811d2769f101c4267e020”,
“ToUserName”: “@@c25f17fed47b6946321965da58fab144f674975dafd780a9e2e173de307e60b6”,
“MsgType”: 1,
“Content”: “我是消息”,
“Status”: 3,
“ImgStatus”: 1,
“CreateTime”: 1555677369,
“VoiceLength”: 0,
“PlayLength”: 0,
“FileName”: “”,
“FileSize”: “”,
“MediaId”: “”,
“Url”: “”,
“AppMsgType”: 0,
“StatusNotifyCode”: 0,
“StatusNotifyUserName”: “”,
“RecommendInfo”: {
“UserName”: “”,
“NickName”: “”,
“QQNum”: 0,
“Province”: “”,
“City”: “”,
“Content”: “”,
“Signature”: “”,
“Alias”: “”,
“Scene”: 0,
“VerifyFlag”: 0,
“AttrStatus”: 0,
“Sex”: 0,
“Ticket”: “”,
“OpCode”: 0
},
“ForwardFlag”: 0,
“AppInfo”: {
“AppID”: “”,
“Type”: 0
},
“HasProductId”: 0,
“Ticket”: “”,
“ImgHeight”: 0,
“ImgWidth”: 0,
“SubMsgType”: 0,
“NewMsgId”: 5689968435146263788,
“OriContent”: “”,
“EncryFileName”: “”,
“ActualNickName”: “发送人”,
“IsAt”: false,
“ActualUserName”: “@863e84f7210f6ca0da17b018aedf81bcf8a173fee1d811d2769f101c4267e020”,
“User”: {
“MemberList”: [
{
“MemberList”: [],
“Uin”: 0,
“UserName”: “@45161ba956dbd3d622499558e649c4beddee71fcd9277b4c8274c10f61295821”,
“NickName”: “成员1”,
“AttrStatus”: 102501,
“PYInitial”: “”,
“PYQuanPin”: “”,
“RemarkPYInitial”: “”,
“RemarkPYQuanPin”: “”,
“MemberStatus”: 0,
“DisplayName”: “”,
“KeyWord”: “”
}, {
“MemberList”: [],
“Uin”: 0,
“UserName”: “@d24ebe966bca4fbb6f08a886d3f2960b1cdfb5fa65113f7081e40b1b1bebbfcb”,
“NickName”: “成员2”,
“AttrStatus”: 233509,
“PYInitial”: “”,
“PYQuanPin”: “”,
“RemarkPYInitial”: “”,
“RemarkPYQuanPin”: “”,
“MemberStatus”: 0,
“DisplayName”: “”,
“KeyWord”: “”
}, {
“MemberList”: [],
“Uin”: 0,
“UserName”: “@863e84f7210f6ca0da17b018aedf81bcf8a173fee1d811d2769f101c4267e020”,
“NickName”: “成员3”,
“AttrStatus”: 16912485,
“PYInitial”: “”,
“PYQuanPin”: “”,
“RemarkPYInitial”: “”,
“RemarkPYQuanPin”: “”,
“MemberStatus”: 0,
“DisplayName”: “”,
“KeyWord”: “”
}
],
“Uin”: 0,
“UserName”: “@@c25f17fed47b6946321965da58fab144f674975dafd780a9e2e173de307e60b6”,
“NickName”: “我是群名称”,
“HeadImgUrl”: “/cgi-bin/mmwebwx-bin/webwxgetheadimg?seq=0&username=@@c25f17fed47b6946321965da58fab144f674975dafd780a9e2e173de307e60b6&skey=@crypt_15c50994_8b9ef531e394f9e342c232850ba31c0f”,
“ContactFlag”: 2,
“MemberCount”: 3,
“RemarkName”: “”,
“HideInputBarFlag”: 0,
“Sex”: 0,
“Signature”: “”,
“VerifyFlag”: 0,
“OwnerUin”: 0,
“PYInitial”: “”,
“PYQuanPin”: “”,
“RemarkPYInitial”: “”,
“RemarkPYQuanPin”: “”,
“StarFriend”: 0,
“AppAccountFlag”: 0,
“Statues”: 1,
“AttrStatus”: 0,
“Province”: “”,
“City”: “”,
“Alias”: “”,
“SnsFlag”: 0,
“UniFriend”: 0,
“DisplayName”: “”,
“ChatRoomId”: 0,
“KeyWord”: “”,
“EncryChatRoomId”: “”,
“IsOwner”: 1,
“IsAdmin”: null,
“Self”: {
“MemberList”: [],
“Uin”: 0,
“UserName”: “@863e84f7210f6ca0da17b018aedf81bcf8a173fee1d811d2769f101c4267e020”,
“NickName”: “成员3”,
“AttrStatus”: 16912485,
“PYInitial”: “”,
“PYQuanPin”: “”,
“RemarkPYInitial”: “”,
“RemarkPYQuanPin”: “”,
“MemberStatus”: 0,
“DisplayName”: “”,
“KeyWord”: “”
},
“HeadImgUpdateFlag”: 1,
“ContactType”: 0,
“ChatRoomOwner”: “@863e84f7210f6ca0da17b018aedf81bcf8a173fee1d811d2769f101c4267e020”
},
“Type”: “Text”,
“Text”: “我是消息”
}
(1) BaseMsg群消息的原格式:

package com.wtyy.dto;

import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

import com.wtyy.module.WechatGroup;
import com.wtyy.module.WechatGroupMember;
import com.wtyy.module.WechatGroupMsg;

public class BaseMsg implements Serializable{

protected static Logger Logger=LoggerFactory.getLogger(BaseMsg.class);

private static final long serialVersionUID = -948732742619659228L;

private String MsgId;

private String FromUserName;

private String ToUserName;

private Integer MsgType;

private String Content;

private Integer Status;

private Integer ImgStatus;

private Date CreateTime;

private Integer VoiceLength;

private Integer PlayLength;

private String FileName;

private String FileSize;

private String MediaId;

private String Url;

private Integer AppMsgType;

private Integer StatusNotifyCode;

private String StatusNotifyUserName;

private Integer ForwardFlag;

private String Ticket;

private Integer SubMsgType;

private String OriContent;

private Boolean IsAt;

//发送者
private String ActualNickName;

private String ActualUserName;

private String Type;

//同content
private String Text;

/** 推荐消息报文 **/
private RecommendInfo RecommendInfo;

/**
* 成员
*/
private WechatGroupDTO User;

/*省略所有get和set方法*/

/**
* WechatGroupDTO 转化成WechatGroup
*/
public static WechatGroup toWehatGroup(WechatGroupDTO wechatGroupDTO) {

List<Member> memberList = wechatGroupDTO.getMemberList();
WechatGroup wechatGroup = new WechatGroup();
wechatGroup.setChatRoomId(wechatGroupDTO.getChatRoomId());
wechatGroup.setGroupNickName(wechatGroupDTO.getNickName());
wechatGroup.setGroupUserName(wechatGroupDTO.getUserName());
wechatGroup.setHeadImgUrl(wechatGroupDTO.getHeadImgUrl());
wechatGroup.setMemberCount(wechatGroupDTO.getMemberCount());
String chatRoomOwner = wechatGroupDTO.getChatRoomOwner();
if(!StringUtils.isEmpty(chatRoomOwner)){
for(Member member:memberList){
if(chatRoomOwner.equals(member.getUserName())){
wechatGroup.setOwnerNickName(member.getNickName());
break;
}
}

}

return wechatGroup;
}

/**
* WechatGroupDTO 转化成 List<WechatGroupMember> 群成员
*/
public static List<WechatGroupMember> toWechatGroupMembers(WechatGroupDTO wechatGroupDTO) {
List<WechatGroupMember> members = new ArrayList<>();
List<Member> membersObj = wechatGroupDTO.getMemberList();
for(Member member : membersObj){
WechatGroupMember wechatGroupMember = new WechatGroupMember();
wechatGroupMember.setNickName(member.getNickName());
/*try {
//wechatGroupMember.setNickName(URLDecoder.decode(member.getNickName(), “utf-8”));
} catch (UnsupportedEncodingException e) {
Logger.error(“解码异常”+member.getNickName());
}*/
//wechatGroupMember.setUin(member.getUin());
wechatGroupMember.setUserName(member.getUserName());
members.add(wechatGroupMember);
}
return members;
}

/**
* 原生消息转化成WechatGroupMsg 消息实体
*/
public static WechatGroupMsg toWechatGroupMsg(BaseMsg msgObj, String groupId,String robotId) {
WechatGroupMsg wechatGroupMsg = new WechatGroupMsg();
wechatGroupMsg.setActualNickName(msgObj.getActualNickName());
wechatGroupMsg.setActualUserName(msgObj.getActualUserName());
wechatGroupMsg.setContent(msgObj.getContent());
wechatGroupMsg.setFileName(msgObj.getFileName());
wechatGroupMsg.setFromUserName(msgObj.getFromUserName());
wechatGroupMsg.setGroupId(groupId);
wechatGroupMsg.setImgStatus(msgObj.getImgStatus());
wechatGroupMsg.setMsgId(msgObj.getMsgId());
wechatGroupMsg.setMsgType(msgObj.getMsgType());
wechatGroupMsg.setRobotId(robotId);
wechatGroupMsg.setStatus(msgObj.getStatus());
wechatGroupMsg.setStatusNotifyCode(msgObj.getStatusNotifyCode());
wechatGroupMsg.setStatusNotifyUserName(msgObj.getStatusNotifyUserName());
wechatGroupMsg.setText(msgObj.getText());
wechatGroupMsg.setType(msgObj.getType());
wechatGroupMsg.setCreateTime(msgObj.getCreateTime());
wechatGroupMsg.setUrl(msgObj.getUrl());
return wechatGroupMsg;
}

}
(2)微信消息体中的User对象:

package com.wtyy.dto;

import java.io.Serializable;
import java.util.List;

public class WechatGroupDTO implements Serializable{

/**
*
*/
private static final long serialVersionUID = 3919060806498433575L;

private List<Member> MemberList;

private Long Uin;

/**
* 群名
*/
private String UserName;

/**
* 群昵称
*/
private String NickName;

/**
* 群头像
*/
private String HeadImgUrl;

private Integer ChatRoomId;

/**
* 群成员数
*/
private Integer MemberCount;

/**
* 群主(用户名)
*/
private String ChatRoomOwner;

/**
* 当前登录用户
*/
private Member Self;

/*省略所有get和set方法*/
}
(3)、微信成员对象:

 

package com.wtyy.dto;
import java.io.Serializable;
public class Member implements Serializable{

private static final long serialVersionUID = 4300987291874273446L;

//private Integer Uin;

private String UserName;

private String NickName;

}
(4)RecommendInfo:

package com.wtyy.dto;

import java.io.Serializable;
public class RecommendInfo implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;

private String Province;
private String Ticket;
private String UserName;
private int Sex;
private int AttrStatus;
private String City;
private String NickName;
private int Scene;
private String Content;
private String Alias;
private String Signature;
private int OpCode;
private int QQNum;
private int VerifyFlag;
}
3、kafka监听:(一般是写成一个单独的job,这里暂时和controller放在一个工程里面)

package com.wtyy.consume;

import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.security.spec.MGF1ParameterSpec;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import ch.qos.logback.core.db.dialect.MsSQLDialect;

import com.alibaba.fastjson.JSON;
import com.wtyy.dto.BaseMsg;
import com.wtyy.dto.WechatGroupDTO;
import com.wtyy.module.WechatGroup;
import com.wtyy.module.WechatGroupMember;
import com.wtyy.module.WechatGroupMsg;
import com.wtyy.service.WechatGroupMsgService;
import com.wtyy.service.WechatService;
import com.wtyy.util.RedisUtil;

/**
*
* @Description: kafka消息监听
*/
@Service
public class WechatGroupConsumer {

protected static Logger logger=LoggerFactory.getLogger(WechatGroupConsumer.class);

@Autowired
private WechatService wechatService;

@Autowired
private WechatGroupMsgService wechatGroupMsgService;

@Autowired
private RedisUtil redisClient;

//redis缓存过期时间,一天
private static final int EXPIRE_TIME = 60*60*24;

//机器昵称-机器id
private static final String WECHAT_ROBOT_NICKNAME_ID = “wechat:robot:{nickName}”;

//群昵称-群id
private static final String WECHAT_GROUP_NICKNAME_ID = “wechatGroup:{nickName}:{robotId}”;

/**
* 1、监听群和群成员
*/
@KafkaListener(topics = {“${wechat.group}”})
public void consumeGroup(ConsumerRecord<?, ?> consumerRecord) throws UnsupportedEncodingException{
String group = (String) consumerRecord.value();
/*List<WechatGroupDTO> groupList = JSONUtils.parseList(groups, WechatGroupDTO.class);

for(WechatGroupDTO wechatGroupDTO : groupList){
handleWechatGroup(wechatGroupDTO);
}*/
WechatGroupDTO wechatGroupDTO = null;
try{
wechatGroupDTO = JSON.parSEObject(group, WechatGroupDTO.class);
}catch (Exception e) {
logger.error(“群成员信息获取异常”);
}
handleWechatGroup(wechatGroupDTO);
}

/**
* 2、监听群消息
*/
@KafkaListener(topics = {“${wechat.msg}”})
public void consumeMsg(ConsumerRecord<?, ?> consumerRecord) throws UnsupportedEncodingException{

try{
String message = (String) consumerRecord.value();
System.err.println(message);
BaseMsg msgObj = JSON.parSEObject(message, BaseMsg.class);
//System.err.println(“时间”+msgObj.getCreateTime());
WechatGroupDTO wechatGroupDTO = msgObj.getUser();
//群昵称
String groupNickName = wechatGroupDTO.getNickName();
//当前登录用户昵称
String currentRobotNickName = wechatGroupDTO.getSelf().getNickName();

String robotId = redisClient.get(WECHAT_ROBOT_NICKNAME_ID, currentRobotNickName);
if(StringUtils.isEmpty(robotId)){
robotId = wechatService.getRobotIdByNickName(currentRobotNickName);
if(!StringUtils.isEmpty(robotId)){
redisClient.setex(WECHAT_ROBOT_NICKNAME_ID, EXPIRE_TIME, robotId, currentRobotNickName);
}else{
logger.error(“robotId为空”);
}
}

if(!StringUtils.isEmpty(robotId)){

//根据群昵称查询群是否存在
String groupId = redisClient.get(WECHAT_GROUP_NICKNAME_ID, groupNickName,robotId);
if(StringUtils.isEmpty(groupId)){
groupId = wechatService.getGroupIdByGroupNickName(groupNickName);
if(!StringUtils.isEmpty(groupId)){
redisClient.setex(WECHAT_GROUP_NICKNAME_ID, EXPIRE_TIME, groupId, groupNickName,robotId);
}
}

if(groupId == null){
//群不存在,插入群与群成员
groupId = addWechatGroupAndMembers(msgObj.getUser(),robotId,groupId);
redisClient.setex(WECHAT_GROUP_NICKNAME_ID, EXPIRE_TIME, groupId, groupNickName,robotId);

}

//插入群消息
WechatGroupMsg wechatGroupMsg = BaseMsg.toWechatGroupMsg(msgObj,groupId,robotId);
wechatGroupMsgService.addGroupMsg(wechatGroupMsg);
}

}catch (Exception e) {
logger.error(e+e.getMessage());
}
}

/**
* @author: tingzhang7
* @param msgObj 原生消息DTO
* @param robotId 机器id
* @param groupId groupId,为空则新增群
* @Description: 新增群
* @param 返回群id
*/
private String addWechatGroupAndMembers(WechatGroupDTO wechatGroupDTO, String robotId,String groupId) {

if(groupId == null){
//新增群
groupId = UUID.randomUUID().toString();
WechatGroup wechatGroup = BaseMsg.toWehatGroup(wechatGroupDTO);
wechatGroup.setGroupId(groupId);
wechatGroup.setRobotId(robotId);
wechatService.addWechatGroup(wechatGroup);
}

//新增群成员
List<WechatGroupMember> groupMembers = BaseMsg.toWechatGroupMembers(wechatGroupDTO);
wechatService.batchInsertWechatGroupMember(groupMembers, groupId);
return groupId;
}

/**
* @author: tingzhang7
* @Description: 处理群
*/
private void handleWechatGroup(WechatGroupDTO wechatGroupDTO) {

if(wechatGroupDTO == null){
logger.error(“群解析后为空”);
return ;
}
String groupNickName = wechatGroupDTO.getNickName();
if(StringUtils.isEmpty(groupNickName)){
logger.error(“群名称为空”);
return;
}
//机器昵称
String robotNickName = wechatGroupDTO.getSelf().getNickName();

//获取机器id
String robotId = redisClient.get(WECHAT_ROBOT_NICKNAME_ID, robotNickName);
if(StringUtils.isEmpty(robotId)){
robotId = wechatService.getRobotIdByNickName(robotNickName);
if(!StringUtils.isEmpty(robotId)){
redisClient.setex(WECHAT_ROBOT_NICKNAME_ID, EXPIRE_TIME, robotId, robotNickName);
}else{
logger.error(“获取不到机器robotId “+robotNickName);
return ;
}
}

//判断该群有没有记录过
String groupId = redisClient.get(WECHAT_GROUP_NICKNAME_ID, groupNickName,robotId);
if(StringUtils.isEmpty(groupId)){
groupId = wechatService.getGroupIdByGroupNickName(groupNickName);
if(!StringUtils.isEmpty(groupId)){
redisClient.setex(WECHAT_GROUP_NICKNAME_ID, EXPIRE_TIME, groupId, groupNickName,robotId);
}
}
//新增群
if(groupId == null){
//群不存在,插入群与群成员
groupId = addWechatGroupAndMembers(wechatGroupDTO,robotId,groupId);
redisClient.setex(WECHAT_GROUP_NICKNAME_ID, EXPIRE_TIME, groupId, groupNickName,robotId);
}else{
//群存在,更新群成员(按照nickName比较)
List<WechatGroupMember> oldMembers = wechatService.getMembersByGroupId(groupId);
List<WechatGroupMember> newMembers = BaseMsg.toWechatGroupMembers(wechatGroupDTO);
if(newMembers.size() != oldMembers.size()){
wechatService.updateWechatGroupSize(groupId,newMembers.size());
}
//newMembers备份
List<WechatGroupMember> copyOfNewMembers = BaseMsg.toWechatGroupMembers(wechatGroupDTO);
//新增的 = newMembers-old,newMembers为新增的差集
newMembers.removeAll(oldMembers);

List<WechatGroupMember> members = new ArrayList<>();
for(WechatGroupMember member : newMembers){
members.add(member);
if(members.size() == 200){
wechatService.batchInsertWechatGroupMember(members, groupId);
members.clear();
}
}
if(!CollectionUtils.isEmpty(members)){
wechatService.batchInsertWechatGroupMember(members, groupId);
}

//减少的 = old-copyOfNewMembers,oldMembers为减少的差集
oldMembers.removeAll(copyOfNewMembers);
List<Integer> ids = new ArrayList<>();
for(WechatGroupMember groupMember : oldMembers){
ids.add(groupMember.getId());
if(ids.size() == 200){
wechatService.batchDeleteWechatGroupMember(ids,groupId);
ids.clear();
}
}
if(!CollectionUtils.isEmpty(ids)){
wechatService.batchDeleteWechatGroupMember(ids,groupId);
}
}
}

}
4、启动类:

package com.wtyy;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ImportResource;
import org.springframework.scheduling.annotation.EnableScheduling;

import com.alibaba.dubbo.config.spring.context.annotation.EnableDubboConfig;
import com.alibaba.dubbo.spring.boot.annotation.EnableDubboConfiguration;

@SpringBootApplication
@EnableDubboConfiguration
@ImportResource(“classpath:consumer.xml”)
@EnableScheduling
public class RestStart {
public static void main(String[] args) {
SpringApplication.run(RestStart.class, args);
}

}
五、最后看下Python脚本:

import itchat, time
from itchat.content import *
from kafka import KafkaProducer
import json
import threading
import oss2
import os
import uuid
producer = KafkaProducer(
value_serializer=lambda v: json.dumps(v).encode(‘utf-8’),
bootstrap_servers=[‘localhost:9092’]
)

#获取群列表
def getChatRooms() :
list = itchat.get_chatrooms(update=True)
itchat.dump_login_status()
for i in list:
# if i[‘MemberCount’] == 0:
# memberList = itchat.update_chatroom(i[‘UserName’], detailedMember=True)
# producer.send(‘wechatgroup’, memberList)
# print(i[‘NickName’],”查询”)
# else:
# producer.send(‘wechatgroup’, i)
# print(i[‘NickName’],”不查询”)
memberList = itchat.update_chatroom(i[‘UserName’], detailedMember=True)
producer.send(‘wechatgroup’, memberList)
timer = threading.Timer(10.5, getChatRooms)
timer.start()

#接收群-文本消息
@itchat.msg_register(TEXT, isGroupChat=True)
def text_reply(msg):
print(“文本消息”)
producer.send(‘wechatmsg’, msg)
# if msg.isAt:
# msg.user.send(u’@%s\u2005I received: %s’ % (
# msg.actualNickName, msg.text))

#接收群 分享消息
@itchat.msg_register([SHARING],isGroupChat=True)
def recieve_sharing(msg):
print(“分享消息”)
producer.send(‘wechatmsg’,msg)

#接收群 文件消息
@itchat.msg_register([PICTURE, RECORDING, ATTACHMENT, VIDEO],isGroupChat=True)
def download_files(msg):
msg.download(msg.fileName)
url = uploadFile(msg.fileName)
msg[‘Text’] = “”
msg[‘Url’] = url
producer.send(‘wechatmsg’,msg)

#上传文件到阿里云并返回url
def uploadFile(fileName):
url = ”
auth = oss2.Auth(xxx, xxx)
endpoint = ‘https://oss-cn-hangzhou.aliyuncs.com’
bucket = oss2.Bucket(auth, endpoint, xxx, connect_timeout=30)
fileUrl = os.getcwd()+os.path.sep+fileName
result = bucket.put_object_from_file(fileName, fileUrl)

if result.status == 200:
# 阿里返回一个关于fileName的url地址 ,最后一个参数是以秒为单位的过期时间
url = bucket.sign_url(‘GET’, fileName, 60 * 60 * 24)
print(‘上传成功’)
#上传成功后删除文件
os.remove(fileUrl)
return url

@itchat.msg_register([TEXT, MAP, CARD, NOTE, SHARING])
def text_reply(msg):
msg.user.send(‘%s: %s’ % (msg.type, msg.text))

@itchat.msg_register(FRIENDS)
def add_friend(msg):
msg.user.verify()
msg.user.send(‘Nice to meet you!’)

# 通过群昵称获得群聊userName
def get_group_userName(group_name):
group_list = itchat.search_chatrooms(name=group_name)
return group_list[0][‘UserName’]

# 通过好友昵称获得好友userName
def get_friend_userName(friend_name):
friend = itchat.search_friends(name=friend_name)
return friend[0][‘UserName’]

#自动拉取好友进群,传入昵称
def addFriendToChatRoom(group_name,friend_name):
groupUserName = get_group_userName(group_name)
print(“群”+groupUserName)
friendUserName = get_friend_userName(friend_name)
print(“好友”+friendUserName)
status = itchat.add_member_into_chatroom(groupUserName,
[{‘UserName’: friendUserName}],
useInvitation=False)
print(“状态”)
print(status)

#多用户登录
instanceDir = ‘%s.pkl’ % uuid.uuid4()
itchat.auto_login(hotReload=True,enableCmdQR =True,statusStorageDir=instanceDir)
#定时获取群列表
timer = threading.Timer(2, getChatRooms)
timer.start()
itchat.run(True)

注:将用户加入群聊有直接加入和发送邀请两种,通过 useInvitation设置,True为发送邀请False为直接加入,超过40人的群只能使用邀请的方式。

********************************************************分割线********************************************************************************

好了,现在启动本地redis、kafka后,再分别启动service项目和rest项目。

准备扫码登录:

登录成功后可以看到会自动生成一个.pkl的文:

就是这个文件缓存了登录状态,所以这个文件的名称我生成了一个随机数,否则

itchat.auto_login(hotReload=True,enableCmdQR =True)
这样的话只允许一个用户登录。登录后就可以定时的更新群成员、获取各类群消息了:

 

————————————————
版权声明:本文为CSDN博主「w_t_y_y」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/w_t_y_y/article/details/89512517

赞(0) 打赏
分享到: 更多 (0)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏