广告位联系
返回顶部
分享到

使用go语言实现Redis持久化的代码

Golang 来源:互联网 作者:佚名 发布时间:2024-07-14 08:38:49 人浏览
摘要

redis是一个内存数据库,如果你把进程杀掉,那么里面存储的数据都会消失,那么这篇文章就是来解决redis持久化的问题 我们在redis.conf文件中增加两个配置 1 2 appendonly yes appendfilename appendonly.

redis 是一个内存数据库,如果你把进程杀掉,那么里面存储的数据都会消失,那么这篇文章就是来解决 redis 持久化的问题

我们在 redis.conf 文件中增加两个配置

1

2

appendonly yes

appendfilename appendonly.aof

  • appenonly 表示只追加
  • appendfilename 表示追加到那什么文件中

指令: *3\r\n$3\r\nSET\r\n$3\r\nKEY\r\n$5\r\nvalue\r\n 落在 appendonly.aof 文件中

1

2

3

4

5

6

7

*3

$3

SET

$3

KEY

$5

value

这里要实现的功能就是把用户发过来的指令,用 RESP 的形式记录在 appendonly.aof 文件中

这个文件是在机器的硬盘上,当 redis 停了之后,内存中的数据都没了,但这个文件会保存下

redis 重启后,会读取这个文件,把之前内存中的数据再次加载回来

定义 AofHandler

在项目下新建文件 aof/aof.go

在里面定义一个 AofHandler 结构体,它的作用就是用来处理 appendonly.aof 文件

1

2

3

4

5

6

7

type AofHandler struct {

    database    databaseface.Database // 持有 db,db 有业务核心

    aofFile     *os.File              // 持有 aof 文件

    aofFilename string                // aof 文件名

    currentDB   int                   // 当前 db

    aofChan     chan *payload         // 写文件的缓冲区

}

这里有注意的是 aofChan,它是写文件的缓冲区

因为从文件中读取指令,指令是非常密集的,但是将指令写入硬盘时非常慢的,我们又不可能每次都等待指令写完成后再去操作 redis

这时我们就把所有想写入 aof 文件的指令放到 aofChan 中,然后在另一个 goroutine 中去写入硬盘

所以这个 aofChan 的类型是 payload 结构体

1

2

3

4

5

type CmdLine = [][]byte

type payload struct {

  cmdLine CmdLine // 指令

  dbIndex int     // db 索引

}

AofHandler 结构体定义好之后,我们需要定义一个 NewAofHandler 函数来初始化 AofHandler 结构体

还需要定义一个 AddAof 方法,用来往 aofChan 中添加指令

放到缓冲区之后,还需要一个方法 HandleAof 将指令写入硬盘

最后还要实现一个从硬盘加载 aof 文件到内存的的函数 LoadAof

实现 NewAofHandler

NewAofHandler 函数用来初始化 AofHandler 结构体

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

func NewAofHandler(database databaseface.Database) (*AofHandler, error) {

  // 初始化 AofHandler 结构体

  handler := &AofHandler{}

  // 从配置文件中读取 aof 文件名

  handler.aofFilename = config.Properties.AppendFilename

  // 持有 db

  handler.database = database

  // 从硬盘加载 aof 文件

  handler.LoadAof()

  // 打开 aof 文件, 如果不存在则创建

  aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)

  if err != nil {

    return nil, err

  }

  // 持有 aof 文件

  handler.aofFile = aofFile

  // 初始化 aofChan

  handler.aofChan = make(chan *payload, aofBufferSize)

  // 启动一个 goroutine 处理 aofChan

  go func() {

    handler.HandleAof()

  }()

  // 返回 AofHandler 结构体

  return handler, nil

}

实现 AddAof

AddAof 方法用来往 aofChan 中添加指令,它不做落盘的操作

因为在执行指令的时候,等待它落盘的话,效率太低了,所以我们把指令放到 aofChan 中,然后在另一个 goroutine 中去处理

1

2

3

4

5

6

7

8

9

10

func (handler *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) {

  // 如果配置文件中的 appendonly 为 true 并且 aofChan 不为 nil

  if config.Properties.AppendOnly && handler.aofChan != nil {

    // 往 aofChan 中添加指令

    handler.aofChan <- &payload{

      cmdLine: cmdLine,

      dbIndex: dbIndex,

    }

  }

}

实现 HandleAof

HandleAof 方法用来处理 aofChan 中的指令,将指令写入硬盘

currentDB 记录的是当前工作的 DB,如果切换了 DB,会在 aof 文件中插入 select 0 这样切换 DB 的语句

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

func (handler *AofHandler) HandleAof() {

  // 初始化 currentDB

  handler.currentDB = 0

  // 遍历 chan

  for p := range handler.aofChan {

    // 如果当前 db 不等于上一次工作的 db,就要插入一条 select 语句

    if p.dbIndex != handler.currentDB {

      // 我们要把 select 0 编码成 RESP 格式

      // 也就是 *2\r\n$6\r\nSELECT\r\n$1\r\n0\r\n

      data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes()

      // 写入 aof 文件

      _, err := handler.aofFile.Write(data)

      if err != nil {

        logger.Warn(err)

        continue

      }

      // 更新 currentDB

      handler.currentDB = p.dbIndex

    }

    // 这里是插入正常的指令

    data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes()

    // 写入 aof 文件

    _, err := handler.aofFile.Write(data)

    if err != nil {

      logger.Warn(err)

    }

  }

}

实现 Aof 落盘功能

我们之前在实现指令的部分,都是直接执行指令,现在我们要把指令写入 aof 文件

我们在 StandaloneDatabase 结构体中增加一个 aofHandler 字段

1

2

3

4

type StandaloneDatabase struct {

  dbSet      []*DB

  aofHandler *aof.AofHandler // 增加落盘功能

}

然后新建 database 时需要对 aofHandler 进行初始化

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

func NewStandaloneDatabase() *StandaloneDatabase {

  // ...

  // 先看下配置文件中的 appendonly 是否为 true

  if config.Properties.AppendOnly {

    // 初始化 aofHandler

    aofHandler, err := aof.NewAofHandler(database)

    if err != nil {

      panic(err)

    }

    // 持有 aofHandler

    database.aofHandler = aofHandler

    // 遍历 dbSet

    for _, db := range database.dbSet {

      // 解决闭包问题

      sdb := db

      // 为每个 db 添加 AddAof 方法

      // 这个 addAof 方法是在执行指令的时候调用的

      sdb.addAof = func(line CmdLine) {

        database.aofHandler.AddAof(sdb.index, line)

      }

    }

  }

  return database

}

这里要注意的是 addAof 方法,它是在执行指令的时候调用的

因为我们需要在指令中调用 Addaof 函数,实现指令写入 aof 文件

但是在指令中,???们只能拿到 db,db 上又没有操作 aof 相关的方法,所以我们需要在 db 中增加一个 addAof 方法

1

2

3

4

5

type DB struct {

    index  int           // 数据的编号

    data   dict.Dict     // 数据类型

    addAof func(CmdLine) // 每个 db 都有一个 addAof 方法

}

然后就在需要落盘的指令中调用 addAof 方法

DEL 方法需要记录下来,因为 DEL 方法是删除数据的,如果不记录下来,那么 aof 文件中的数据就会和内存中的数据不一致

1

2

3

4

5

6

7

8

// DEL K1 K2 K3

func DEL(db *DB, args [][]byte) resp.Reply {

  deleted := db.Removes(keys...)

  // delete 大于 0 说明有数据被删除

  if deleted > 0 {

    db.addAof(utils.ToCmdLine2("DEL", args...))

  }

}

FLUSHDB 方法也需要记录下来,因为 FLUSHDB 方法是删除当前 DB 中的所有数据

1

2

3

4

// FLUSHDB

func FLUSHDB(db *DB, args [][]byte) resp.Reply {

    db.addAof(utils.ToCmdLine2("FLUSEHDB", args...))

}

RENAME 和 RENAMENX 方法也需要记录下来,因为这两个方法是修改 key 的名字

1

2

3

4

5

6

7

8

9

// RENAME K1 K2

func RENAME(db *DB, args [][]byte) resp.Reply {

  db.addAof(utils.ToCmdLine2("RENAME", args...))

}

 

// RENAMENX K1 K2

func RENAMENX(db *DB, args [][]byte) resp.Reply {

  db.addAof(utils.ToCmdLine2("RENAMENX", args...))

}

SET 和 SETNX 方法也需要记录下来,因为这两个方法是设置数据的

1

2

3

4

5

6

7

8

9

// SET K1 v

func SET(db *DB, args [][]byte) resp.Reply {

  db.addAof(utils.ToCmdLine2("SET", args...))

}

 

// SETNX K1 v

func SETNX(db *DB, args [][]byte) resp.Reply {

  db.addAof(utils.ToCmdLine2("SETNX", args...))

}

GETSET 方法也需要记录下来,因为这个方法是设置数据的同时返回旧数据

1

2

3

4

// GETSET K1 v1

func GETSET(db *DB, args [][]byte) resp.Reply {

  db.addAof(utils.ToCmdLine2("GETSET", args...))

}

实现 LoadAof

LoadAof 方法用来从硬盘加载 aof 文件到内存

aof 中的指令是符合 RESP 协议的,我们就可以把这些指令当成用户发过来的指令,执行就可以了

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

func (handler *AofHandler) LoadAof() {

  // 打开 aof 文件

  file, err := os.Open(handler.aofFilename)

  if err != nil {

    logger.Error(err)

    return

  }

  // 关闭文件

  defer func() {

    _ = file.Close()

  }()

  // 创建一个 RESP 解析器,将 file 传入,解析后的指令会放到 chan 中

  ch := parser.ParseStream(file)

  fackConn := &connection.Connection{}

  // 遍历 chan,执行指令

  for p := range ch {

    if p.Err != nil {

      // 如果是 EOF,说明文件读取完毕

      if p.Err == io.EOF {

        break

      }

      logger.Error(err)

      continue

    }

    if p.Data == nil {

      logger.Error("empty payload")

      continue

    }

    // 将指令转换成 MultiBulkReply 类型

    r, ok := p.Data.(*reply.MultiBulkReply)

    if !ok {

      logger.Error("exec multi mulk")

      continue

    }

    // 执行指令

    rep := handler.database.Exec(fackConn, r.Args)

    if reply.IsErrReply(rep) {

      logger.Error(rep)

    }

  }

}


版权声明 : 本文内容来源于互联网或用户自行发布贡献,该文观点仅代表原作者本人。本站仅提供信息存储空间服务和不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权, 违法违规的内容, 请发送邮件至2530232025#qq.cn(#换@)举报,一经查实,本站将立刻删除。
原文链接 :
相关文章
  • Go中gin框架的*gin.Context参数常见实用方法
    梗概: *gin.Context是处理HTTP请求的核心。ctx代表context(上下文),它包含了处理请求所需的所有信息和方法,例如请求数据、响应构建器、
  • Python实现图像添加水印的方法
    在日常图像处理中,为图片添加水印是一项常见任务。有多种方法和工具可供选择,而今天我们将专注于使用Python语言结合PIL库批量添加水
  • go的defer和return的执行顺序介绍
    go的defer和return的执行顺序 go的defer和return是golang中的两个关键字,return用于返回函数的返回值,也可以参与一定的流程控制,比如下面代码
  • 使用go语言实现Redis持久化的代码
    redis是一个内存数据库,如果你把进程杀掉,那么里面存储的数据都会消失,那么这篇文章就是来解决redis持久化的问题 我们在redis.conf文件
  • Go中的Timer和Ticker介绍
    一:简介 在日常开发中,我们可能会遇到需要延迟执行或周期性地执行一些任务。这个时候就需要用到Go语言中的定时器。 在Go语言中,定
  • Golang在gin框架中使用JWT鉴权
    什么是JWT JWT,全称 JSON Web Token,是一种开放标准(RFC 7519),用于安全地在双方之间传递信息。尤其适用于身份验证和授权场景。JWT 的设计
  • Golang使用Redis与连接池方式
    Golang使用Redis与连接池 使用下载go的redis包go get github.com/gomodule/redigo/redis 如果网不好的话就很费劲了 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 2
  • Golang使用embed引入静态文件的代码
    Go 语言从 1.16 版本开始引入了一个新的标准库embed,可以在二进制文件中引入静态文件 指令:/go:embed 通过一个简单的小实例,来演示将静态
  • Golang发送Get和Post请求的实现
    最近在研究钉钉机器人,发现钉钉的第三方接口有时需要用Get或者Post请求访问,虽然说可以通过Apifox直接模拟发送请求,但是我还是想研究
  • Go实现数据脱敏的方案设计
    在一些常见的业务场景中可能涉及到用户的手机号,银行卡号等敏感数据,对于这部分的数据经常需要进行数据脱敏处理,就是将此部分数
  • 本站所有内容来源于互联网或用户自行发布,本站仅提供信息存储空间服务,不拥有版权,不承担法律责任。如有侵犯您的权益,请您联系站长处理!
  • Copyright © 2017-2022 F11.CN All Rights Reserved. F11站长开发者网 版权所有 | 苏ICP备2022031554号-1 | 51LA统计