package com.pz.applet; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.baomidou.mybatisplus.core.toolkit.StringUtils; import com.pz.common.utils.DateUtils; import com.pz.system.domain.Message; import com.pz.system.domain.SessionList; import com.pz.system.mapper.MessageMapper; import com.pz.system.mapper.SessionListMapper; import com.pz.websocket.SpringContextUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * @author lisw * @program ly-project * @description 聊天 * @createDate 2021-05-30 11:32:39 * <p> * 描述: * 一对一聊天 * @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端, * 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端 */ @Component @Slf4j @ServerEndpoint("/webSocketOneToOne/{sendId}/{roomId}") public class WebSocketOneToOneController { // 静态变量,用来记录当前在线连接数· private static int onlineCount; //实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key为用户标识 private static final Map<Long, WebSocketOneToOneController> connections = new ConcurrentHashMap<>(); public static Map<Long, List<Object>> sessionPool = new ConcurrentHashMap<>(); // 与某个客户端的连接会话,需要通过它来给客户端发送数据 private Session session; private Long sendId; private String roomId; @Resource private MessageMapper messageMapper; @Resource private SessionListMapper sessionListMapper; /** * 连接建立成功调用的方法 * * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据 */ @OnOpen public void onOpen(@PathParam("sendId") Long sendId, @PathParam("roomId") String roomId, Session session) { this.session = session; this.sendId = sendId; //用户标识 this.roomId = roomId; //会话标识 List<Object> list = new ArrayList<>(); list.add(roomId); list.add(session); connections.put(sendId, this); //添加到map中 sessionPool.put(sendId, list); //创建会话 addOnlineCount(); // 在线数加 log.info("sendId:" + sendId + "roomId:" + roomId); System.out.println(this.session); System.out.println("有新连接加入!新用户:" + sendId + ",当前在线人数为" + getOnlineCount()); } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { connections.remove(sendId); // 从map中移除 subOnlineCount(); // 在线数减 System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount()); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息 * @param session 可选的参数 */ @OnMessage public void onMessage(String message, Session session) { String sessionId = this.session.getPathParameters().get("roomId"); System.out.println("来自客户端的消息:" + message); JSONObject json = JSON.parseObject(message); String msg = json.getString("content"); // 需要发送的信息 String requestId = json.getString("requestId"); int msgType = json.getIntValue("messageType"); String lastMessageTime = json.getString("lastMessageTime"); // 使用类型推断,如果不存在键"lastMessageTime",则为null Long receiveId = json.getLong("receiveId"); // 发送对象的用户标识(接收者) String createTime = json.getString("createTime"); // 发送消息 send(msg, sendId, receiveId, roomId, msgType, requestId, lastMessageTime, sessionId, createTime); } /** * 发生错误时调用 * * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { System.out.println("发生错误"); error.printStackTrace(); } private void sendMessage(SessionList sessionList, Message message) { int res = messageMapper.insert(message); if (res == 1) { message.setStatus("-1"); } WebSocketOneToOneController con = connections.get(message.getReceiver()); if (con != null && roomId.equals(con.roomId)) { try { con.session.getBasicRemote().sendText(JSON.toJSONString(message)); message.setIsRead("1"); messageMapper.updateById(message); } catch (IOException e) { // 处理发送消息异常 e.printStackTrace(); } } WebSocketOneToOneController confrom = connections.get(message.getSender()); if (confrom != null && roomId.equals(confrom.roomId)) { try { confrom.session.getBasicRemote().sendText(JSON.toJSONString(message)); } catch (IOException e) { // 处理发送消息异常 e.printStackTrace(); } } } /** * @param msg 消息内容 * @param sendId 发送人 * @param receiveId 接收人 * @param roomId 房间ID * @param msgType 消息类型 * @param requestId 消息请求ID * @param lastMessageTime 最后一次的消息时间 */ public void send(String msg, Long sendId, Long receiveId, String roomId, int msgType, String requestId, String lastMessageTime, String sessionId, String createTime) { Message message = new Message(); message.setContent(msg); Date now = new Date(); message.setReceiver(receiveId); message.setSender(sendId); message.setContentType(msgType); message.setIsRead("0"); message.setRequestId(requestId); message.setType(0); message.setCreateTime(new Date()); if (messageMapper == null) { this.messageMapper = (MessageMapper) SpringContextUtil.getBean("messageMapper"); } if (sessionListMapper == null) { this.sessionListMapper = (SessionListMapper) SpringContextUtil.getBean("sessionListMapper"); } //获取本次会话信息 SessionList sessionList = sessionListMapper.selectById(Integer.parseInt(sessionId)); List<Object> list = sessionPool.get(sessionList.getToUserId()); try { if (CollectionUtils.isEmpty(list)) { //增加对方未读数 sessionListMapper.addUnReadCount(receiveId, sendId); //发送消息 sendMessage(sessionList, message); } else { String o = (String) list.get(0); if (StringUtils.isNotBlank(o) && !o.equals("null")) { Integer toId = Integer.valueOf(o); if (Integer.valueOf(sessionId).equals(toId)) { //彼此都在会话中,直接发送消息 sendMessage(sessionList, message); } } else { if (sessionId == null) { //创建会话 SessionList tmpSessionList = new SessionList(); tmpSessionList.setUserId(sessionList.getToUserId()); tmpSessionList.setToUserId(sessionList.getUserId()); tmpSessionList.setUnReadCount(1L); sessionListMapper.insert(tmpSessionList); } else { //增加对方未读数 sessionListMapper.addUnReadCount(receiveId, sendId); } sendMessage(sessionList, message); } } } catch (Exception e) { // 处理发送消息异常 e.printStackTrace(); } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocketOneToOneController.onlineCount++; } public static synchronized void subOnlineCount() { WebSocketOneToOneController.onlineCount--; } }