Commit 0165295b by 邹磊浩

websocket完善

parent e41ea363
package com.pz.web.controller.system; package com.pz.web.controller.system;
import cn.dev33.satoken.annotation.SaCheckPermission; import cn.dev33.satoken.annotation.SaCheckPermission;
import cn.dev33.satoken.annotation.SaIgnore;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.pz.common.annotation.Log; import com.pz.common.annotation.Log;
import com.pz.common.annotation.RepeatSubmit; import com.pz.common.annotation.RepeatSubmit;
import com.pz.common.core.controller.BaseController; import com.pz.common.core.controller.BaseController;
import com.pz.common.core.domain.PageQuery;
import com.pz.common.core.domain.R; import com.pz.common.core.domain.R;
import com.pz.common.core.domain.entity.SysUser; import com.pz.common.core.domain.entity.SysUser;
import com.pz.common.core.page.TableDataInfo;
import com.pz.common.core.validate.AddGroup; import com.pz.common.core.validate.AddGroup;
import com.pz.common.enums.BusinessType; import com.pz.common.enums.BusinessType;
import com.pz.system.domain.SessionList; import com.pz.system.domain.SessionList;
import com.pz.system.domain.bo.SessionListBo; import com.pz.system.domain.bo.SessionListBo;
import com.pz.system.domain.vo.SessionListVo;
import com.pz.system.mapper.SessionListMapper; import com.pz.system.mapper.SessionListMapper;
import com.pz.system.mapper.SysUserMapper; import com.pz.system.mapper.SysUserMapper;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
...@@ -27,37 +33,47 @@ import javax.validation.constraints.NotEmpty; ...@@ -27,37 +33,47 @@ import javax.validation.constraints.NotEmpty;
@Validated @Validated
@RequiredArgsConstructor @RequiredArgsConstructor
@RestController @RestController
@RequestMapping("/system/list") @RequestMapping("/system/session")
public class SessionListController extends BaseController { public class SessionListController extends BaseController {
private final SessionListMapper sessionListMapper; private final SessionListMapper sessionListMapper;
private final SysUserMapper sysUserMapper; /**
* 查询会话列列表
*/
@GetMapping("/list")
public TableDataInfo<SessionListVo> list(SessionListBo bo, PageQuery pageQuery) {
return sessionListMapper.selectVoPage(pageQuery.build(), Wrappers.<SessionList>lambdaQuery().eq(SessionList::getUserId, getUserId()));
}
/** /**
* 创建会话 * 创建会话
*/ */
@SaCheckPermission("system:list:add")
@Log(title = "会话列", businessType = BusinessType.INSERT) @Log(title = "会话列", businessType = BusinessType.INSERT)
@RepeatSubmit() @RepeatSubmit()
@PostMapping() @PostMapping("/createSession")
@SaIgnore
@Transactional
public R<Void> add(@Validated(AddGroup.class) @RequestBody SessionListBo bo) { public R<Void> add(@Validated(AddGroup.class) @RequestBody SessionListBo bo) {
// 创建会话列表对象
SessionList sessionList = new SessionList(); SessionList sessionList = new SessionList();
sessionList.setUserId(bo.getUserId()); sessionList.setUserId(bo.getUserId());
sessionList.setUnReadCount(0L); sessionList.setUnReadCount(0L);
sessionList.setListName(bo.getToUserName());
sessionList.setToUserId(bo.getToUserId()); sessionList.setToUserId(bo.getToUserId());
// 插入会话列表记录
sessionListMapper.insert(sessionList); sessionListMapper.insert(sessionList);
// 判断对方和我建立会话没有? 没有也要建立
Integer SessionId = sessionListMapper.selectIdByUser(bo.getToUserId(), bo.getUserId()); // 判断对方和我建立会话没有,如果没有则建立
if (SessionId == null || SessionId <= 0) { boolean hasSession = sessionListMapper.selectIdByUser(bo.getToUserId(), bo.getUserId()) > 0;
SysUser user = sysUserMapper.selectUserById(bo.getUserId()); if (!hasSession) {
sessionList.setUserId(bo.getToUserId()); // 新建会话列表对象
sessionList.setToUserId(bo.getUserId()); SessionList newSessionList = new SessionList();
sessionList.setListName(user.getNickName()); newSessionList.setUserId(bo.getToUserId());
sessionListMapper.insert(sessionList); newSessionList.setToUserId(bo.getUserId());
// 插入新的会话列表记录
sessionListMapper.insert(newSessionList);
} }
return R.ok(); return R.ok();
} }
...@@ -68,11 +84,11 @@ public class SessionListController extends BaseController { ...@@ -68,11 +84,11 @@ public class SessionListController extends BaseController {
* *
* @param id 主键 * @param id 主键
*/ */
@SaCheckPermission("system:list:remove")
@Log(title = "会话列", businessType = BusinessType.DELETE) @Log(title = "会话列", businessType = BusinessType.DELETE)
@DeleteMapping("/{id}") @DeleteMapping("/{id}")
public R<Void> remove(@NotEmpty(message = "主键不能为空") public R<Void> remove(@NotEmpty(message = "主键不能为空")
@PathVariable Long id) { @PathVariable Long id) {
return toAjax(sessionListMapper.deleteById(id)); return toAjax(sessionListMapper.deleteById(id));
} }
} }
...@@ -4,9 +4,12 @@ import com.alibaba.fastjson.JSON; ...@@ -4,9 +4,12 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.StringUtils; import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.pz.common.core.domain.entity.SysUser;
import com.pz.common.utils.DateUtils; import com.pz.common.utils.DateUtils;
import com.pz.system.domain.Message; import com.pz.system.domain.Message;
import com.pz.system.domain.SessionList;
import com.pz.system.mapper.MessageMapper; import com.pz.system.mapper.MessageMapper;
import com.pz.system.mapper.SessionListMapper;
import com.pz.system.mapper.SysUserMapper; import com.pz.system.mapper.SysUserMapper;
import com.pz.system.service.ISysUserService; import com.pz.system.service.ISysUserService;
import com.pz.web.controller.websocket.SpringContextUtil; import com.pz.web.controller.websocket.SpringContextUtil;
...@@ -19,7 +22,9 @@ import javax.websocket.*; ...@@ -19,7 +22,9 @@ import javax.websocket.*;
import javax.websocket.server.PathParam; import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint; import javax.websocket.server.ServerEndpoint;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
...@@ -43,6 +48,8 @@ public class WebSocketOneToOneController { ...@@ -43,6 +48,8 @@ public class WebSocketOneToOneController {
private static int onlineCount; private static int onlineCount;
//实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key为用户标识 //实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key为用户标识
private static final Map<Long, WebSocketOneToOneController> connections = new ConcurrentHashMap<>(); private static final Map<Long, WebSocketOneToOneController> connections = new ConcurrentHashMap<>();
public static Map<Long, List<Object>> sessionPool = new ConcurrentHashMap<>();
// 与某个客户端的连接会话,需要通过它来给客户端发送数据 // 与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session; private Session session;
private Long sendId; private Long sendId;
...@@ -51,6 +58,9 @@ public class WebSocketOneToOneController { ...@@ -51,6 +58,9 @@ public class WebSocketOneToOneController {
@Resource @Resource
private MessageMapper messageMapper; private MessageMapper messageMapper;
@Resource
private SessionListMapper sessionListMapper;
/** /**
* 连接建立成功调用的方法 * 连接建立成功调用的方法
...@@ -62,7 +72,11 @@ public class WebSocketOneToOneController { ...@@ -62,7 +72,11 @@ public class WebSocketOneToOneController {
this.session = session; this.session = session;
this.sendId = sendId; //用户标识 this.sendId = sendId; //用户标识
this.roomId = roomId; //会话标识 this.roomId = roomId; //会话标识
List<Object> list = new ArrayList<>();
list.add(roomId);
list.add(session);
connections.put(sendId, this); //添加到map中 connections.put(sendId, this); //添加到map中
sessionPool.put(sendId, list); //创建会话
addOnlineCount(); // 在线数加 addOnlineCount(); // 在线数加
log.info("sendId:" + sendId + "roomId:" + roomId); log.info("sendId:" + sendId + "roomId:" + roomId);
System.out.println(this.session); System.out.println(this.session);
...@@ -74,7 +88,7 @@ public class WebSocketOneToOneController { ...@@ -74,7 +88,7 @@ public class WebSocketOneToOneController {
*/ */
@OnClose @OnClose
public void onClose() { public void onClose() {
connections.remove(sendId); // 从map中移除 connections.remove(sendId); // 从map中移除
subOnlineCount(); // 在线数减 subOnlineCount(); // 在线数减
System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount()); System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount());
} }
...@@ -87,18 +101,17 @@ public class WebSocketOneToOneController { ...@@ -87,18 +101,17 @@ public class WebSocketOneToOneController {
*/ */
@OnMessage @OnMessage
public void onMessage(String message, Session session) { public void onMessage(String message, Session session) {
String sessionId = this.session.getPathParameters().get("roomId");
System.out.println("来自客户端的消息:" + message); System.out.println("来自客户端的消息:" + message);
JSONObject json = JSON.parseObject(message); JSONObject json = JSON.parseObject(message);
String msg = json.getString("content"); //需要发送的信息 String msg = json.getString("content"); // 需要发送的信息
String requestId = json.getString("requestId"); String requestId = json.getString("requestId");
int msgType = json.getIntValue("messageType"); int msgType = json.getIntValue("messageType");
String lastMessageTime = null; String lastMessageTime = json.getString("lastMessageTime"); // 使用类型推断,如果不存在键"lastMessageTime",则为null
if (json.containsKey("lastMessageTime")) { Long receiveId = json.getLong("receiveId"); // 发送对象的用户标识(接收者)
lastMessageTime = json.getString("lastMessageTime");
} // 发送消息
Long giftId = null; send(msg, sendId, receiveId, roomId, msgType, requestId, lastMessageTime, sessionId);
Long receiveId = json.getLong("receiveId"); //发送对象的用户标识(接收者)
send(msg, sendId, receiveId, roomId, msgType, requestId, lastMessageTime);
} }
/** /**
...@@ -113,6 +126,34 @@ public class WebSocketOneToOneController { ...@@ -113,6 +126,34 @@ public class WebSocketOneToOneController {
error.printStackTrace(); error.printStackTrace();
} }
private void sendMessage(SessionList sessionList, Message message) {
int res = messageMapper.insert(message);
if (res == 1) {
message.setStatus("-1");
}
WebSocketOneToOneController con = connections.get(message.getReceiver());
if (con != null && roomId.equals(con.roomId)) {
try {
con.session.getBasicRemote().sendText(JSON.toJSONString(message));
message.setIsRead("1");
messageMapper.updateById(message);
} catch (IOException e) {
// 处理发送消息异常
e.printStackTrace();
}
}
WebSocketOneToOneController confrom = connections.get(message.getSender());
if (confrom != null && roomId.equals(confrom.roomId)) {
try {
confrom.session.getBasicRemote().sendText(JSON.toJSONString(message));
} catch (IOException e) {
// 处理发送消息异常
e.printStackTrace();
}
}
}
/** /**
* @param msg 消息内容 * @param msg 消息内容
...@@ -123,7 +164,7 @@ public class WebSocketOneToOneController { ...@@ -123,7 +164,7 @@ public class WebSocketOneToOneController {
* @param requestId 消息请求ID * @param requestId 消息请求ID
* @param lastMessageTime 最后一次的消息时间 * @param lastMessageTime 最后一次的消息时间
*/ */
public void send(String msg, Long sendId, Long receiveId, String roomId, int msgType, String requestId, String lastMessageTime) { public void send(String msg, Long sendId, Long receiveId, String roomId, int msgType, String requestId, String lastMessageTime, String sessionId) {
Message message = new Message(); Message message = new Message();
message.setContent(msg); message.setContent(msg);
Date now = new Date(); Date now = new Date();
...@@ -133,6 +174,7 @@ public class WebSocketOneToOneController { ...@@ -133,6 +174,7 @@ public class WebSocketOneToOneController {
message.setIsRead("0"); message.setIsRead("0");
message.setRequestId(requestId); message.setRequestId(requestId);
message.setType(0); message.setType(0);
if (StringUtils.isNotBlank(lastMessageTime)) { if (StringUtils.isNotBlank(lastMessageTime)) {
Date lastTime = DateUtils.stringToDate(lastMessageTime, "yyyy-MM-dd HH:mm"); Date lastTime = DateUtils.stringToDate(lastMessageTime, "yyyy-MM-dd HH:mm");
long minute = (now.getTime() - lastTime.getTime()) / 1000 / 60; long minute = (now.getTime() - lastTime.getTime()) / 1000 / 60;
...@@ -141,34 +183,47 @@ public class WebSocketOneToOneController { ...@@ -141,34 +183,47 @@ public class WebSocketOneToOneController {
message.setType(1); message.setType(1);
} }
} }
if (messageMapper == null) { if (messageMapper == null) {
this.messageMapper = (MessageMapper) SpringContextUtil.getBean("messageMapper"); this.messageMapper = (MessageMapper) SpringContextUtil.getBean("messageMapper");
} }
if (sessionListMapper == null) {
this.sessionListMapper = (SessionListMapper) SpringContextUtil.getBean("sessionListMapper");
}
//获取本次会话信息
SessionList sessionList = sessionListMapper.selectById(Integer.parseInt(sessionId));
List<Object> list = sessionPool.get(sessionList.getToUserId());
try { try {
int res = messageMapper.insert(message); if (list == null || list.isEmpty()) {
if (res == 1) { //增加对方未读数
message.setStatus("-1"); sessionListMapper.addUnReadCount(receiveId, sendId);
} } else {
//发送 Integer toId = Integer.valueOf((String) list.get(0));
WebSocketOneToOneController con = connections.get(receiveId); if (Integer.valueOf(sessionId).equals(toId)) {
if (con != null) { //彼此都在会话中,直接发送消息
if (roomId.equals(con.roomId)) { sendMessage(sessionList, message);
con.session.getBasicRemote().sendText(JSON.toJSONString(message)); } else {
message.setIsRead("1"); if (sessionId == null) {
messageMapper.updateById(message); //创建会话
} SessionList tmpSessionList = new SessionList();
} tmpSessionList.setUserId(sessionList.getToUserId());
//通知发送消息的用户,消息已经发送成功 tmpSessionList.setToUserId(sessionList.getUserId());
WebSocketOneToOneController confrom = connections.get(sendId); tmpSessionList.setUnReadCount(1L);
if (confrom != null) { sessionListMapper.insert(tmpSessionList);
if (roomId.equals(confrom.roomId)) { } else {
confrom.session.getBasicRemote().sendText(JSON.toJSONString(message)); //增加对方未读数
sessionListMapper.addUnReadCount(receiveId, sendId);
}
sendMessage(sessionList, message);
} }
} }
} catch (Exception e) {
} catch (IOException e) { // 处理发送消息异常
e.printStackTrace(); e.printStackTrace();
} }
} }
public static synchronized int getOnlineCount() { public static synchronized int getOnlineCount() {
......
...@@ -32,10 +32,6 @@ public class SessionList { ...@@ -32,10 +32,6 @@ public class SessionList {
*/ */
private Long toUserId; private Long toUserId;
/** /**
* 会话名称
*/
private String listName;
/**
* 未读消息数 * 未读消息数
*/ */
private Long unReadCount; private Long unReadCount;
......
...@@ -63,7 +63,6 @@ public class SessionListServiceImpl implements ISessionListService { ...@@ -63,7 +63,6 @@ public class SessionListServiceImpl implements ISessionListService {
LambdaQueryWrapper<SessionList> lqw = Wrappers.lambdaQuery(); LambdaQueryWrapper<SessionList> lqw = Wrappers.lambdaQuery();
lqw.eq(bo.getUserId() != null, SessionList::getUserId, bo.getUserId()); lqw.eq(bo.getUserId() != null, SessionList::getUserId, bo.getUserId());
lqw.eq(bo.getToUserId() != null, SessionList::getToUserId, bo.getToUserId()); lqw.eq(bo.getToUserId() != null, SessionList::getToUserId, bo.getToUserId());
lqw.like(StringUtils.isNotBlank(bo.getListName()), SessionList::getListName, bo.getListName());
lqw.eq(bo.getUnReadCount() != null, SessionList::getUnReadCount, bo.getUnReadCount()); lqw.eq(bo.getUnReadCount() != null, SessionList::getUnReadCount, bo.getUnReadCount());
return lqw; return lqw;
} }
......
...@@ -8,7 +8,6 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" ...@@ -8,7 +8,6 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<result property="id" column="id"/> <result property="id" column="id"/>
<result property="userId" column="user_id"/> <result property="userId" column="user_id"/>
<result property="toUserId" column="to_user_id"/> <result property="toUserId" column="to_user_id"/>
<result property="listName" column="list_name"/>
<result property="unReadCount" column="un_read_count"/> <result property="unReadCount" column="un_read_count"/>
</resultMap> </resultMap>
<update id="addUnReadCount"> <update id="addUnReadCount">
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment