go小型通讯系统


项目开发流程

需求分析–> 设计阶段—> 编码实现 –> 测试阶段–>实施

采用的原理:优化效率用缓存和算法,优化程序结构用分层

需求分析

  1. 用户注册

  2. 用户登录

  3. 显示在线用户列表

  4. 群聊(广播)

  5. 点对点聊天

  6. 离线留言

项目结构设计

​ 使用Goland工具 操作,创建项目chatroom,新建一下目录

项目开发前技术准备

项目要保存用户信息和消息数据,选择Redis ,Golang** 中使用 Redis.

客户端登录菜单

功能:能够正确的显示客户端的菜单。界面:

代码实现:

client/main/main.go

package main

import (
    "chatroom/client/process"
    "fmt"
    "os"
)

//定义两个变量,一个表示用户id, 一个表示用户密码
var userId int
var userPwd string
var userName string

func main() {

    //接收用户的选择
    var key int
    //判断是否还继续显示菜单
    //var loop = true

    for true {
        fmt.Println("----------------欢迎登陆多人聊天系统------------")
        fmt.Println("\t\t\t 1 登陆聊天室")
        fmt.Println("\t\t\t 2 注册用户")
        fmt.Println("\t\t\t 3 退出系统")
        fmt.Println("\t\t\t 请选择(1-3):")

        fmt.Scanf("%d\n", &key)
        switch key {
        case 1:
            fmt.Println("登陆聊天室")
            fmt.Println("请输入用户的id")
            fmt.Scanf("%d\n", &userId)
            fmt.Println("请输入用户的密码")
            fmt.Scanf("%s\n", &userPwd)
            // 完成登录
            //1. 创建一个UserProcess的实例
            up := &process.UserProcess{}
            up.Login(userId, userPwd)
        case 2:
            fmt.Println("注册用户")
            fmt.Println("请输入用户id:")
            fmt.Scanf("%d\n", &userId)
            fmt.Println("请输入用户密码:")
            fmt.Scanf("%s\n", &userPwd)
            fmt.Println("请输入用户名字(nickname):")
            fmt.Scanf("%s\n", &userName)
            //2. 调用UserProcess,完成注册的请求、
            up := &process.UserProcess{}
            up.Register(userId, userPwd, userName)
        case 3:
            fmt.Println("退出系统")
            //loop = false
            os.Exit(0)
        default:
            fmt.Println("你的输入有误,请重新输入")
        }

    }
}

client/login.go

package main

import (
    "chatroom/common/message"
    "encoding/binary"
    "encoding/json"
    "fmt"
    "net"
)
//写一个函数,完成登录
func login(userId int, userPwd string) (err error) {
    //下一个就要开始定协议..
    // fmt.Printf(" userId = %d userPwd=%s\n", userId, userPwd)
    // return nil
    //1. 链接到服务器
    conn, err := net.Dial("tcp", "localhost:8889")
    if err != nil {
        fmt.Println("net.Dial err=", err)
        return
    }
    //延时关闭
    defer conn.Close()

    //2. 准备通过conn发送消息给服务
    var mes message.Message
    mes.Type = message.LoginMesType
    //3. 创建一个LoginMes 结构体
    var loginMes message.LoginMes
    loginMes.UserId = userId
    loginMes.UserPwd = userPwd

    //4. 将loginMes 序列化
    data, err := json.Marshal(loginMes)
    if err != nil {
        fmt.Println("json.Marshal err=", err)
        return
    }
    // 5. 把data赋给 mes.Data字段
    mes.Data = string(data)

    // 6. 将 mes进行序列化化
    data, err = json.Marshal(mes)
    if err != nil {
        fmt.Println("json.Marshal err=", err)
        return
    }
    // 7. 到这个时候 data就是我们要发送的消息
    // 7.1 先把 data的长度发送给服务器
    // 先获取到 data的长度->转成一个表示长度的byte切片
    var pkgLen uint32
    pkgLen = uint32(len(data))
    var buf [4]byte
    binary.BigEndian.PutUint32(buf[0:4], pkgLen)
    // 发送长度
    n, err := conn.Write(buf[:4])
    if n != 4 || err != nil {
        fmt.Println("conn.Write(bytes) fail", err)
        return
    }
    //fmt.Printf("客户端,发送消息的长度=%d 内容=%s", len(data), string(data))

    // 发送消息本身
    _, err = conn.Write(data)
    if err != nil {
        fmt.Println("conn.Write(data) fail", err)
        return
    }
    //休眠20
    // time.Sleep(20 * time.Second)
    // fmt.Println("休眠了20..")
    // 这里还需要处理服务器端返回的消息.
    mes, err = readPkg(conn) // mes 就是

    if err != nil {
        fmt.Println("readPkg(conn) err=", err)
        return
    }
    //将mes的Data部分反序列化成 LoginResMes
    var loginResMes message.LoginResMes
    err = json.Unmarshal([]byte(mes.Data), &loginResMes)
    if loginResMes.Code == 200 {
        fmt.Println("登录成功")
    } else if loginResMes.Code == 500 {
        fmt.Println(loginResMes.Error)
    }
    return
}

工具文件

client/utils/utils.go

package utils

import (
    "chatroom/common/message"
    "encoding/binary"
    "encoding/json"
    "fmt"
    "net"
)

//这里将这些方法关联到结构体中
type Transfer struct {
    //分析它应该有哪些字段
    Conn net.Conn
    Buf  [8096]byte //这时传输时,使用缓冲
}

func (this *Transfer) ReadPkg() (mes message.Message, err error) {

    //buf := make([]byte, 8096)
    fmt.Println("读取客户端发送的数据...")
    //conn.Read 在conn没有被关闭的情况下,才会阻塞
    //如果客户端关闭了 conn 则,就不会阻塞
    _, err = this.Conn.Read(this.Buf[:4])
    if err != nil {
        //err = errors.New("read pkg header error")
        return
    }
    //根据buf[:4] 转成一个 uint32类型
    var pkgLen uint32
    pkgLen = binary.BigEndian.Uint32(this.Buf[0:4])
    //根据 pkgLen 读取消息内容
    n, err := this.Conn.Read(this.Buf[:pkgLen])
    if n != int(pkgLen) || err != nil {
        //err = errors.New("read pkg body error")
        return
    }
    //把pkgLen 反序列化成 -> message.Message
    // 技术就是一层窗户纸 &mes!!
    err = json.Unmarshal(this.Buf[:pkgLen], &mes)
    if err != nil {
        fmt.Println("json.Unmarsha err=", err)
        return
    }
    return
}

func (this *Transfer) WritePkg(data []byte) (err error) {

    //先发送一个长度给对方
    var pkgLen uint32
    pkgLen = uint32(len(data))
    //var buf [4]byte
    binary.BigEndian.PutUint32(this.Buf[0:4], pkgLen)
    // 发送长度
    n, err := this.Conn.Write(this.Buf[:4])
    if n != 4 || err != nil {
        fmt.Println("conn.Write(bytes) fail", err)
        return
    }

    //发送data本身
    n, err = this.Conn.Write(data)
    if n != int(pkgLen) || err != nil {
        fmt.Println("conn.Write(bytes) fail", err)
        return
    }
    return
}

全局的curUser

client/model/curlUser.go

package model
import (
    "chatroom/common/message"
    "net"
)
//因为在客户端,我们很多地方会使用到curUser,我们将其作为一个全局
type CurUser struct {
    Conn net.Conn
    message.User
}

客户端发送并处理消息

process文件下进行消息的处理

client/process/server.go

package process

import (
    "chatroom/client/utils"
    "chatroom/common/message"
    "encoding/json"
    "fmt"
    "net"
    "os"
)

//显示登录成功后的界面..
func ShowMenu() {

    fmt.Println("-------恭喜xxx登录成功---------")
    fmt.Println("-------1. 显示在线用户列表---------")
    fmt.Println("-------2. 发送消息---------")
    fmt.Println("-------3. 信息列表---------")
    fmt.Println("-------4. 退出系统---------")
    fmt.Println("请选择(1-4):")
    var key int
    var content string

    //因为,我们总会使用到SmsProcess实例,因此我们将其定义在swtich外部
    smsProcess := &SmsProcess{}
    fmt.Scanf("%d\n", &key)
    switch key {
    case 1:
        //fmt.Println("显示在线用户列表-")
        outputOnlineUser()
    case 2:
        fmt.Println("你想对大家说的什么:)")
        fmt.Scanf("%s\n", &content)
        smsProcess.SendGroupMes(content)
    case 3:
        fmt.Println("信息列表")
    case 4:
        fmt.Println("你选择退出了系统...")
        os.Exit(0)
    default:
        fmt.Println("你输入的选项不正确..")
    }

}

//和服务器保持通讯
func serverProcessMes(conn net.Conn) {
    //创建一个transfer实例, 不停的读取服务器发送的消息
    tf := &utils.Transfer{
        Conn: conn,
    }
    for {
        fmt.Println("客户端正在等待读取服务器发送的消息")
        mes, err := tf.ReadPkg()
        if err != nil {
            fmt.Println("tf.ReadPkg err=", err)
            return
        }
        //如果读取到消息,又是下一步处理逻辑
        switch mes.Type {

        case message.NotifyUserStatusMesType: // 有人上线了

            //1. 取出.NotifyUserStatusMes
            var notifyUserStatusMes message.NotifyUserStatusMes
            json.Unmarshal([]byte(mes.Data), &notifyUserStatusMes)
            //2. 把这个用户的信息,状态保存到客户map[int]User中
            updateUserStatus(&notifyUserStatusMes)
            //处理
        case message.SmsMesType: //有人群发消息
            outputGroupMes(&mes)
        default:
            fmt.Println("服务器端返回了未知的消息类型")
        }
        //fmt.Printf("mes=%v\n", mes)

    }
}

client/process/smsMgr.go

package process
import (
    "chatroom/common/message"
    "encoding/json"
    "fmt"
)
func outputGroupMes(mes *message.Message) { //这个地方mes一定SmsMes
    //显示即可
    //1. 反序列化mes.Data
    var smsMes message.SmsMes
    err := json.Unmarshal([]byte(mes.Data), &smsMes)
    if err != nil {
        fmt.Println("json.Unmarshal err=", err.Error())
        return
    }

    //显示信息
    info := fmt.Sprintf("用户id:\t%d 对大家说:\t%s",
        smsMes.UserId, smsMes.Content)
    fmt.Println(info)
    fmt.Println()
}

client/process/smsProcess.go

package process

import (
    "chatroom/client/utils"
    "chatroom/common/message"
    "encoding/json"
    "fmt"
)

type SmsProcess struct {
}

//发送群聊的消息
func (this *SmsProcess) SendGroupMes(content string) (err error) {

    //1 创建一个Mes
    var mes message.Message
    mes.Type = message.SmsMesType

    //2 创建一个SmsMes 实例
    var smsMes message.SmsMes
    smsMes.Content = content               //内容
    smsMes.UserId = CurUser.UserId         //
    smsMes.UserStatus = CurUser.UserStatus //

    //3.序列化 smsMes
    data, err := json.Marshal(smsMes)
    if err != nil {
        fmt.Println("SendGroupMes json.Marshal fail =", err.Error())
        return
    }

    mes.Data = string(data)

    //4. 对mes再次序列化
    data, err = json.Marshal(mes)
    if err != nil {
        fmt.Println("SendGroupMes json.Marshal fail =", err.Error())
        return
    }

    //5. 将mes发送给服务器。。
    tf := &utils.Transfer{
        Conn: CurUser.Conn,
    }
    //6.发送
    err = tf.WritePkg(data)
    if err != nil {
        fmt.Println("SendGroupMes err=", err.Error())
        return
    }
    return
}

client/process/userMgr.go

package process
import (
    "chatroom/client/model"
    "chatroom/common/message"
    "fmt"
)
//客户端要维护的map
var onlineUsers map[int]*message.User = make(map[int]*message.User, 10)
var CurUser model.CurUser //我们在用户登录成功后,完成对CurUser初始化

//在客户端显示当前在线的用户
func outputOnlineUser() {
    //遍历一把 onlineUsers
    fmt.Println("当前在线用户列表:")
    for id, _ := range onlineUsers {
        //如果不显示自己.
        fmt.Println("用户id:\t", id)
    }
}
//编写一个方法,处理返回的NotifyUserStatusMes
func updateUserStatus(notifyUserStatusMes *message.NotifyUserStatusMes) {

    //适当优化
    user, ok := onlineUsers[notifyUserStatusMes.UserId]
    if !ok { //原来没有
        user = &message.User{
            UserId: notifyUserStatusMes.UserId,
        }
    }
    user.UserStatus = notifyUserStatusMes.Status
    onlineUsers[notifyUserStatusMes.UserId] = user

    outputOnlineUser()
}

client/process/userProcess.go

package process

import (
    "chatroom/client/utils"
    "chatroom/common/message"
    "encoding/binary"
    "encoding/json"
    "fmt"
    "net"
    "os"
)

type UserProcess struct {
    //暂时不需要字段..
}

func (this *UserProcess) Register(userId int,
    userPwd string, userName string) (err error) {

    //1. 链接到服务器
    conn, err := net.Dial("tcp", "localhost:8889")
    if err != nil {
        fmt.Println("net.Dial err=", err)
        return
    }
    //延时关闭
    defer conn.Close()

    //2. 准备通过conn发送消息给服务
    var mes message.Message
    mes.Type = message.RegisterMesType
    //3. 创建一个LoginMes 结构体
    var registerMes message.RegisterMes
    registerMes.User.UserId = userId
    registerMes.User.UserPwd = userPwd
    registerMes.User.UserName = userName

    //4.将registerMes 序列化
    data, err := json.Marshal(registerMes)
    if err != nil {
        fmt.Println("json.Marshal err=", err)
        return
    }

    // 5. 把data赋给 mes.Data字段
    mes.Data = string(data)

    // 6. 将 mes进行序列化化
    data, err = json.Marshal(mes)
    if err != nil {
        fmt.Println("json.Marshal err=", err)
        return
    }

    //创建一个Transfer 实例
    tf := &utils.Transfer{
        Conn: conn,
    }

    //发送data给服务器端
    err = tf.WritePkg(data)
    if err != nil {
        fmt.Println("注册发送信息错误 err=", err)
    }

    mes, err = tf.ReadPkg() // mes 就是 RegisterResMes

    if err != nil {
        fmt.Println("readPkg(conn) err=", err)
        return
    }

    //将mes的Data部分反序列化成 RegisterResMes
    var registerResMes message.RegisterResMes
    err = json.Unmarshal([]byte(mes.Data), &registerResMes)
    if registerResMes.Code == 200 {
        fmt.Println("注册成功, 你重新登录一把")
        os.Exit(0)
    } else {
        fmt.Println(registerResMes.Error)
        os.Exit(0)
    }
    return
}

//给关联一个用户登录的方法
//写一个函数,完成登录
func (this *UserProcess) Login(userId int, userPwd string) (err error) {

    //下一个就要开始定协议..
    // fmt.Printf(" userId = %d userPwd=%s\n", userId, userPwd)

    // return nil

    //1. 链接到服务器
    conn, err := net.Dial("tcp", "localhost:8889")
    if err != nil {
        fmt.Println("net.Dial err=", err)
        return
    }
    //延时关闭
    defer conn.Close()

    //2. 准备通过conn发送消息给服务
    var mes message.Message
    mes.Type = message.LoginMesType
    //3. 创建一个LoginMes 结构体
    var loginMes message.LoginMes
    loginMes.UserId = userId
    loginMes.UserPwd = userPwd

    //4. 将loginMes 序列化
    data, err := json.Marshal(loginMes)
    if err != nil {
        fmt.Println("json.Marshal err=", err)
        return
    }
    // 5. 把data赋给 mes.Data字段
    mes.Data = string(data)

    // 6. 将 mes进行序列化化
    data, err = json.Marshal(mes)
    if err != nil {
        fmt.Println("json.Marshal err=", err)
        return
    }

    // 7. 到这个时候 data就是我们要发送的消息
    // 7.1 先把 data的长度发送给服务器
    // 先获取到 data的长度->转成一个表示长度的byte切片
    var pkgLen uint32
    pkgLen = uint32(len(data))
    var buf [4]byte
    binary.BigEndian.PutUint32(buf[0:4], pkgLen)
    // 发送长度
    n, err := conn.Write(buf[:4])
    if n != 4 || err != nil {
        fmt.Println("conn.Write(bytes) fail", err)
        return
    }

    fmt.Printf("客户端,发送消息的长度=%d 内容=%s", len(data), string(data))

    // 发送消息本身
    _, err = conn.Write(data)
    if err != nil {
        fmt.Println("conn.Write(data) fail", err)
        return
    }

    //休眠20
    // time.Sleep(20 * time.Second)
    // fmt.Println("休眠了20..")
    // 这里还需要处理服务器端返回的消息.
    //创建一个Transfer 实例
    tf := &utils.Transfer{
        Conn: conn,
    }
    mes, err = tf.ReadPkg() // mes 就是

    if err != nil {
        fmt.Println("readPkg(conn) err=", err)
        return
    }

    //将mes的Data部分反序列化成 LoginResMes
    var loginResMes message.LoginResMes
    err = json.Unmarshal([]byte(mes.Data), &loginResMes)
    if loginResMes.Code == 200 {
        //初始化CurUser
        CurUser.Conn = conn
        CurUser.UserId = userId
        CurUser.UserStatus = message.UserOnline

        //fmt.Println("登录成功")
        //可以显示当前在线用户列表,遍历loginResMes.UsersId
        fmt.Println("当前在线用户列表如下:")
        for _, v := range loginResMes.UsersId {
            //如果我们要求不显示自己在线,下面我们增加一个代码
            if v == userId {
                continue
            }

            fmt.Println("用户id:\t", v)
            //完成 客户端的 onlineUsers 完成初始化
            user := &message.User{
                UserId:     v,
                UserStatus: message.UserOnline,
            }
            onlineUsers[v] = user
        }
        fmt.Print("\n\n")

        //这里我们还需要在客户端启动一个协程
        //该协程保持和服务器端的通讯.如果服务器有数据推送给客户端
        //则接收并显示在客户端的终端.
        go serverProcessMes(conn)

        //1. 显示我们的登录成功的菜单[循环]..
        for {
            ShowMenu()
        }
    } else {
        fmt.Println(loginResMes.Error)
    }
    return
}

服务器端完成用户登录

显示服务器开启监听

完成客户端可以发送消息长度,服务器端可以正常收到该长度

代码实现:

server/main/main.go

package main

import (
    "chatroom/server/model"
    "fmt"
    "net"
    "time"
)
//处理和客户端的通讯
func process(conn net.Conn) {
    //这里需要延时关闭conn
    defer conn.Close()

    //这里调用总控, 创建一个
    processor := &Processor{
        Conn: conn,
    }
    err := processor.process2()
    if err != nil {
        fmt.Println("客户端和服务器通讯协程错误=err", err)
        return
    }
}

func init() {
    //当服务器启动时,我们就去初始化我们的redis的连接池
    initPool("localhost:6379", 16, 0, 300*time.Second)
    initUserDao()
}

//这里我们编写一个函数,完成对UserDao的初始化任务
func initUserDao() {
    //这里的pool 本身就是一个全局的变量
    //这里需要注意一个初始化顺序问题
    //initPool, 在 initUserDao
    model.MyUserDao = model.NewUserDao(pool)
}
func main() {
    //提示信息
    fmt.Println("服务器[新的结构]在8889端口监听....")
    listen, err := net.Listen("tcp", "0.0.0.0:8889")
    defer listen.Close()
    if err != nil {
        fmt.Println("net.Listen err=", err)
        return
    }
    //一旦监听成功,就等待客户端来链接服务器
    for {
        fmt.Println("等待客户端来链接服务器.....")
        conn, err := listen.Accept()
        if err != nil {
            fmt.Println("listen.Accept err=", err)
        }

        //一旦链接成功,则启动一个协程和客户端保持通讯。。
        go process(conn)
    }
}

server/main/processor.go

package main

import (
    "chatroom/common/message"
    process2 "chatroom/server/process"
    "chatroom/server/utils"
    "fmt"
    "io"
    "net"
)

//先创建一个Processor 的结构体体
type Processor struct {
    Conn net.Conn
}

//编写一个ServerProcessMes 函数
//功能:根据客户端发送消息种类不同,决定调用哪个函数来处理
func (this *Processor) serverProcessMes(mes *message.Message) (err error) {

    //看看是否能接收到客户端发送的群发的消息
    fmt.Println("mes=", mes)

    switch mes.Type {
    case message.LoginMesType:
        //处理登录登录
        //创建一个UserProcess实例
        up := &process2.UserProcess{
            Conn: this.Conn,
        }
        err = up.ServerProcessLogin(mes)
    case message.RegisterMesType:
        //处理注册
        up := &process2.UserProcess{
            Conn: this.Conn,
        }
        err = up.ServerProcessRegister(mes) // type : data
    case message.SmsMesType:
        //创建一个SmsProcess实例完成转发群聊消息.
        smsProcess := &process2.SmsProcess{}
        smsProcess.SendGroupMes(mes)
    default:
        fmt.Println("消息类型不存在,无法处理...")
    }
    return
}

func (this *Processor) process2() (err error) {

    //循环的客户端发送的信息
    for {
        //这里我们将读取数据包,直接封装成一个函数readPkg(), 返回Message, Err
        //创建一个Transfer 实例完成读包任务
        tf := &utils.Transfer{
            Conn: this.Conn,
        }
        mes, err := tf.ReadPkg()
        if err != nil {
            if err == io.EOF {
                fmt.Println("客户端退出,服务器端也退出..")
                return err
            } else {
                fmt.Println("readPkg err=", err)
                return err
            }

        }
        err = this.serverProcessMes(&mes)
        if err != nil {
            return err
        }
    }

}

建立Redis连接池

server/main/redis.go

package main
import (
    "github.com/garyburd/redigo/redis"
    "time"
)

//定义一个全局的pool
var pool *redis.Pool

func initPool(address string, maxIdle, maxActive int, idleTimeout time.Duration) {

    pool = &redis.Pool{
        MaxIdle: maxIdle, //最大空闲链接数
        MaxActive: maxActive, // 表示和数据库的最大链接数, 0 表示没有限制
        IdleTimeout: idleTimeout, // 最大空闲时间
        Dial: func() (redis.Conn, error) { // 初始化链接的代码, 链接哪个ip的redis
        return redis.Dial("tcp", address)
        },
    }
}

服务器端完成注册用户和群聊

common文件提起公共的部分

common/message/message.go

package message

const (
    LoginMesType             = "LoginMes"
    LoginResMesType            = "LoginResMes"
    RegisterMesType            = "RegisterMes"
    RegisterResMesType         = "RegisterResMes"
    NotifyUserStatusMesType = "NotifyUserStatusMes"
    SmsMesType                = "SmsMes"
)

//这里我们定义几个用户状态的常量
const (
    UserOnline = iota
    UserOffline 
    UserBusyStatus 
)

type Message struct {
    Type string `json:"type"`  //消息类型
    Data string `json:"data"` //消息的类型
}



//定义两个消息..后面需要再增加

type LoginMes struct {
    UserId int `json:"userId"` //用户id
    UserPwd string `json:"userPwd"` //用户密码
    UserName string `json:"userName"` //用户名
}

type LoginResMes struct {
    Code int  `json:"code"` // 返回状态码 500 表示该用户未注册 200表示登录成功
    UsersId []int            // 增加字段,保存用户id的切片
    Error string `json:"error"` // 返回错误信息
}

type RegisterMes struct {
    User User `json:"user"`//类型就是User结构体.
}
type RegisterResMes struct {
    Code int  `json:"code"` // 返回状态码 400 表示该用户已经占有 200表示注册成功
    Error string `json:"error"` // 返回错误信息
}

//为了配合服务器端推送用户状态变化的消息
type NotifyUserStatusMes struct {
    UserId int `json:"userId"` //用户id
    Status int `json:"status"` //用户的状态
}

//增加一个SmsMes //发送的消息
type SmsMes struct {
    Content string `json:"content"` //内容
    User //匿名结构体,继承
}

// SmsReMes

common/message/user.go

package message

//定义一个用户的结构体

type User struct {
    //确定字段信息
    //为了序列化和反序列化成功,我们必须保证
    //用户信息的json字符串的key 和 结构体的字段对应的 tag 名字一致!!!
    UserId int `json:"userId"` 
    UserPwd string `json:"userPwd"`
    UserName string `json:"userName"`
    UserStatus int `json:"userStatus"` //用户状态..
    Sex string `json:"sex"` //性别.
}

工具文件

server/utils/utils.go

package utils

import (
    "chatroom/common/message"
    "encoding/binary"
    "encoding/json"
    "fmt"
    "net"
)

//这里将这些方法关联到结构体中
type Transfer struct {
    //分析它应该有哪些字段
    Conn net.Conn
    Buf  [8096]byte //这时传输时,使用缓冲
}

func (this *Transfer) ReadPkg() (mes message.Message, err error) {

    //buf := make([]byte, 8096)
    fmt.Println("读取客户端发送的数据...")
    //conn.Read 在conn没有被关闭的情况下,才会阻塞
    //如果客户端关闭了 conn 则,就不会阻塞
    _, err = this.Conn.Read(this.Buf[:4])
    if err != nil {
        //err = errors.New("read pkg header error")
        return
    }
    //根据buf[:4] 转成一个 uint32类型
    var pkgLen uint32
    pkgLen = binary.BigEndian.Uint32(this.Buf[0:4])
    //根据 pkgLen 读取消息内容
    n, err := this.Conn.Read(this.Buf[:pkgLen])
    if n != int(pkgLen) || err != nil {
        //err = errors.New("read pkg body error")
        return
    }
    //把pkgLen 反序列化成 -> message.Message
    // 技术就是一层窗户纸 &mes!!
    err = json.Unmarshal(this.Buf[:pkgLen], &mes)
    if err != nil {
        fmt.Println("json.Unmarsha err=", err)
        return
    }
    return
}

func (this *Transfer) WritePkg(data []byte) (err error) {
    //先发送一个长度给对方
    var pkgLen uint32
    pkgLen = uint32(len(data))
    //var buf [4]byte
    binary.BigEndian.PutUint32(this.Buf[0:4], pkgLen)
    // 发送长度
    n, err := this.Conn.Write(this.Buf[:4])
    if n != 4 || err != nil {
        fmt.Println("conn.Write(bytes) fail", err)
        return
    }

    //发送data本身
    n, err = this.Conn.Write(data)
    if n != int(pkgLen) || err != nil {
        fmt.Println("conn.Write(bytes) fail", err)
        return
    }
    return
}

服务器端完成登录时能返回当前在线用户

process文件下代码实现:

server/process/userMgr.go

package process2
import (
    "fmt"
)
//因为UserMgr 实例在服务器端有且只有一个
//因为在很多的地方,都会使用到,因此,我们
//将其定义为全局变量
var (
    userMgr *UserMgr
)
type UserMgr struct {
    onlineUsers map[int]*UserProcess
}
//完成对userMgr初始化工作
func init() {
    userMgr = &UserMgr{
        onlineUsers : make(map[int]*UserProcess, 1024),
    }
}
//完成对onlineUsers添加
func (this *UserMgr) AddOnlineUser(up *UserProcess) {
    this.onlineUsers[up.UserId] = up
}
//删除
func (this *UserMgr) DelOnlineUser(userId int) {
    delete(this.onlineUsers, userId)
}
//返回当前所有在线的用户
func (this *UserMgr) GetAllOnlineUser() map[int]*UserProcess {
    return this.onlineUsers
}
//根据id返回对应的值
func (this *UserMgr) GetOnlineUserById(userId int) (up *UserProcess, err error) {

    //如何从map取出一个值,带检测方式
    up, ok := this.onlineUsers[userId]
    if !ok { //说明,你要查找的这个用户,当前不在线。
        err = fmt.Errorf("用户%d 不存在", userId)
        return 
    }
    return 
}

server/process/smsProcess.go

package process2

import (
    "chatroom/common/message"
    "chatroom/server/utils"
    "fmt"
    "net"

    "encoding/json"
)

type SmsProcess struct {
    //..[暂时不需字段]
}

//写方法转发消息
func (this *SmsProcess) SendGroupMes(mes *message.Message) {

    //遍历服务器端的onlineUsers map[int]*UserProcess,
    //将消息转发取出.
    //取出mes的内容 SmsMes
    var smsMes message.SmsMes
    err := json.Unmarshal([]byte(mes.Data), &smsMes)
    if err != nil {
        fmt.Println("json.Unmarshal err=", err)
        return
    }

    data, err := json.Marshal(mes)
    if err != nil {
        fmt.Println("json.Marshal err=", err)
        return
    }

    for id, up := range userMgr.onlineUsers {
        //这里,还需要过滤到自己,即不要再发给自己
        if id == smsMes.UserId {
            continue
        }
        this.SendMesToEachOnlineUser(data, up.Conn)
    }
}
func (this *SmsProcess) SendMesToEachOnlineUser(data []byte, conn net.Conn) {

    //创建一个Transfer 实例,发送data
    tf := &utils.Transfer{
        Conn: conn, //
    }
    err := tf.WritePkg(data)
    if err != nil {
        fmt.Println("转发消息失败 err=", err)
    }
}

server/process/userProcess.go

package process2

import (
    "chatroom/common/message"
    "chatroom/server/model"
    "chatroom/server/utils"
    "encoding/json"
    "fmt"
    "net"
)

type UserProcess struct {
    //字段
    Conn net.Conn
    //增加一个字段,表示该Conn是哪个用户
    UserId int
}

//这里我们编写通知所有在线的用户的方法
//userId 要通知其它的在线用户,我上线
func (this *UserProcess) NotifyOthersOnlineUser(userId int) {

    //遍历 onlineUsers, 然后一个一个的发送 NotifyUserStatusMes
    for id, up := range userMgr.onlineUsers {
        //过滤到自己
        if id == userId {
            continue
        }
        //开始通知【单独的写一个方法】
        up.NotifyMeOnline(userId)
    }
}

func (this *UserProcess) NotifyMeOnline(userId int) {

    //组装我们的NotifyUserStatusMes
    var mes message.Message
    mes.Type = message.NotifyUserStatusMesType

    var notifyUserStatusMes message.NotifyUserStatusMes
    notifyUserStatusMes.UserId = userId
    notifyUserStatusMes.Status = message.UserOnline

    //将notifyUserStatusMes序列化
    data, err := json.Marshal(notifyUserStatusMes)
    if err != nil {
        fmt.Println("json.Marshal err=", err)
        return
    }
    //将序列化后的notifyUserStatusMes赋值给 mes.Data
    mes.Data = string(data)

    //对mes再次序列化,准备发送.
    data, err = json.Marshal(mes)
    if err != nil {
        fmt.Println("json.Marshal err=", err)
        return
    }

    //发送,创建我们Transfer实例,发送
    tf := &utils.Transfer{
        Conn: this.Conn,
    }

    err = tf.WritePkg(data)
    if err != nil {
        fmt.Println("NotifyMeOnline err=", err)
        return
    }
}

func (this *UserProcess) ServerProcessRegister(mes *message.Message) (err error) {

    //1.先从mes 中取出 mes.Data ,并直接反序列化成RegisterMes
    var registerMes message.RegisterMes
    err = json.Unmarshal([]byte(mes.Data), &registerMes)
    if err != nil {
        fmt.Println("json.Unmarshal fail err=", err)
        return
    }

    //1先声明一个 resMes
    var resMes message.Message
    resMes.Type = message.RegisterResMesType
    var registerResMes message.RegisterResMes

    //我们需要到redis数据库去完成注册.
    //1.使用model.MyUserDao 到redis去验证
    err = model.MyUserDao.Register(&registerMes.User)

    if err != nil {
        if err == model.ERROR_USER_EXISTS {
            registerResMes.Code = 505
            registerResMes.Error = model.ERROR_USER_EXISTS.Error()
        } else {
            registerResMes.Code = 506
            registerResMes.Error = "注册发生未知错误..."
        }
    } else {
        registerResMes.Code = 200
    }

    data, err := json.Marshal(registerResMes)
    if err != nil {
        fmt.Println("json.Marshal fail", err)
        return
    }

    //4. 将data 赋值给 resMes
    resMes.Data = string(data)

    //5. 对resMes 进行序列化,准备发送
    data, err = json.Marshal(resMes)
    if err != nil {
        fmt.Println("json.Marshal fail", err)
        return
    }
    //6. 发送data, 我们将其封装到writePkg函数
    //因为使用分层模式(mvc), 我们先创建一个Transfer 实例,然后读取
    tf := &utils.Transfer{
        Conn: this.Conn,
    }
    err = tf.WritePkg(data)
    return

}

//编写一个函数serverProcessLogin函数, 专门处理登录请求
func (this *UserProcess) ServerProcessLogin(mes *message.Message) (err error) {
    //核心代码...
    //1. 先从mes 中取出 mes.Data ,并直接反序列化成LoginMes
    var loginMes message.LoginMes
    err = json.Unmarshal([]byte(mes.Data), &loginMes)
    if err != nil {
        fmt.Println("json.Unmarshal fail err=", err)
        return
    }
    //1先声明一个 resMes
    var resMes message.Message
    resMes.Type = message.LoginResMesType
    //2在声明一个 LoginResMes,并完成赋值
    var loginResMes message.LoginResMes

    //我们需要到redis数据库去完成验证.
    //1.使用model.MyUserDao 到redis去验证
    user, err := model.MyUserDao.Login(loginMes.UserId, loginMes.UserPwd)

    if err != nil {

        if err == model.ERROR_USER_NOTEXISTS {
            loginResMes.Code = 500
            loginResMes.Error = err.Error()
        } else if err == model.ERROR_USER_PWD {
            loginResMes.Code = 403
            loginResMes.Error = err.Error()
        } else {
            loginResMes.Code = 505
            loginResMes.Error = "服务器内部错误..."
        }

    } else {
        loginResMes.Code = 200
        //这里,因为用户登录成功,我们就把该登录成功的用放入到userMgr中
        //将登录成功的用户的userId 赋给 this
        this.UserId = loginMes.UserId
        userMgr.AddOnlineUser(this)
        //通知其它的在线用户, 我上线了
        this.NotifyOthersOnlineUser(loginMes.UserId)
        //将当前在线用户的id 放入到loginResMes.UsersId
        //遍历 userMgr.onlineUsers
        for id, _ := range userMgr.onlineUsers {
            loginResMes.UsersId = append(loginResMes.UsersId, id)
        }
        fmt.Println(user, "登录成功")
    }
    // //如果用户id= 100, 密码=123456, 认为合法,否则不合法

    // if loginMes.UserId == 100 && loginMes.UserPwd == "123456" {
    //     //合法
    //     loginResMes.Code = 200

    // } else {
    //     //不合法
    //     loginResMes.Code = 500 // 500 状态码,表示该用户不存在
    //     loginResMes.Error = "该用户不存在, 请注册再使用..."
    // }

    //3将 loginResMes 序列化
    data, err := json.Marshal(loginResMes)
    if err != nil {
        fmt.Println("json.Marshal fail", err)
        return
    }

    //4. 将data 赋值给 resMes
    resMes.Data = string(data)

    //5. 对resMes 进行序列化,准备发送
    data, err = json.Marshal(resMes)
    if err != nil {
        fmt.Println("json.Marshal fail", err)
        return
    }
    //6. 发送data, 我们将其封装到writePkg函数
    //因为使用分层模式(mvc), 我们先创建一个Transfer 实例,然后读取
    tf := &utils.Transfer{
        Conn: this.Conn,
    }
    err = tf.WritePkg(data)
    return
}

效果展示

打开Redis服务器

注册用户

在Redis中查看用户

服务器端传输的信息

登录用户

服务器端显示的登录信息

显示当前登录和发送消息

服务器端显示的发送信息

实现群发功能

采用终端方式

在GOPATH的目录下,即项目src下cmd打开终端

编译clinet和server

开一个服务器,三个客户端,登录进行群发

在id为102的用户上查看在线用户

在id为102的用户上进行群发

对应服务器中的数据变化

退出


文章作者: 读序
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 读序 !
  目录