分享一个mysql备份数据脚本

分享一个mysql备份数据脚本

         今天跟大家分享一个自己用Go语言编写的MySQL数据库备份脚本。

         它支持按需忽略指定表的数据备份,也可以自定义定时策略,方便大家根据实际场景灵活调整。

         这个脚本源自于我个人工作中的实际需求,经过多次使用和优化,稳定性和实用性都不错。如果你有类似需求,可以参考看看,也欢迎交流改进~

         一、创建配置文件

         在项目目录下创建config.yaml文件,用于配置脚本参数,这里可以设置备份数据库链接信息,忽略指定表的数据备份,定义定时策略,同时还可以

指定备份文件存放地址,目前可以存放本地、腾讯云、七牛云,配置文件如下

#备份数据表链接
mysql:
  host: "127.0.0.1"
  port: 3306
  user: "自己数据库账户"
  password: "自己数据库密码"
  database: "需要备份数据库名称"
#不需要备份数据表
skip_tables:
  - "node_device"
#固定触发时间
schedule_time:
  - "0 52 09 * * *"    #每天9点52备份一次
  - "0 */3 * * * *"    #每3分钟备份一次
  - "0 0 9 * * 1"      #每周一9点备份一次
#文件保存方式 空:本地 cos:腾讯云cos qiNiu:七牛云
save: ""
#本地目录配置
localPath : "data"
#腾讯云配置
cos:
  secretID: ""
  secretKey: ""
  region : ""
  catalogue : ""
#七牛云配置
qiNiu:
  accessKey: ""
  secretKey: ""
  bucket : ""
  catalogue : ""
  region: ""  # 新增区域配置:z0-华东, z1-华北, z2-华南, na0-北美, as0-东南亚

我这里默认是存放在本地,大家可以根据自己需求进行配置。

       二、脚本代码

      在项目目录下创建main.go文件,这里将是我们脚本的运行逻辑代码,我这里直接贴出代码

package main

//引入扩展
import (
    "context"
    "database/sql"
    "fmt"
    _ "github.com/go-sql-driver/mysql"
    "github.com/qiniu/go-sdk/v7/auth"
    "github.com/qiniu/go-sdk/v7/storage"
    "github.com/robfig/cron/v3"
    "github.com/tencentyun/cos-go-sdk-v5"
    _ "github.com/tencentyun/cos-go-sdk-v5"
    yaml "gopkg.in/yaml.v2"
    "io/ioutil"
    "log"
    "net/http"
    "net/url"
    "os"
    "strings"
    "sync"
    "time"
    "path/filepath"
)

// MySQLConfig 定义 MySQL 的配置结构
type MySQLConfig struct {
    Host     string `yaml:"host"`
    Port     int    `yaml:"port"`
    User     string `yaml:"user"`
    Password string `yaml:"password"`
    Database string `yaml:"database"`
}

// Cos 腾讯云配置
type Cos struct {
    SecretID  string `yaml:"secretID"`
    SecretKey string `yaml:"secretKey"`
    Region    string `yaml:"region"`
    Catalogue string `yaml:"catalogue"`
}

// QiNiu 七牛云配置
type QiNiu struct {
    AccessKey string `yaml:"accessKey"`
    SecretKey string `yaml:"secretKey"`
    Bucket    string `yaml:"bucket"`
    Catalogue string `yaml:"catalogue"`
    Region    string `yaml:"region"`
}

// Config 定义整体的配置结构
type Config struct {
    MySQL        MySQLConfig `yaml:"mysql"`
    LocalPath    string      `yaml:"localPath"`
    SkipTables   []string    `yaml:"skip_tables"`
    ScheduleTime []string    `yaml:"schedule_time"`
    Cos          Cos         `yaml:"cos"`
    QiNiu        QiNiu       `yaml:"qiNiu"`
    Save         string      `yaml:"save"`
}

// 用于控制并发写入文件的互斥锁
var mu sync.Mutex

/*
单表备份 - 先备份表结构,然后根据配置决定是否备份数据
*/
func backupTable(db *sql.DB, tableName string, wg *sync.WaitGroup, sem chan struct{}, file *os.File, skipData bool) {
    // 在函数结束时调用 Done
    defer wg.Done()

    // 先备份表结构
    err := backupTableStructure(db, tableName, file)
    if err != nil {
        fmt.Printf("Error backing up table structure for %s: %v\n", tableName, err)
        return
    }

    // 如果跳过数据备份,则直接返回
    if skipData {
        fmt.Printf("Table structure backed up for %s (data skipped)\n", tableName)
        return
    }

    // 控制并发量
    sem <- struct{}{}
    defer func() { <-sem }()

    // 分批次备份数据
    offset := 0
    limit := 1000
    for {
        query := fmt.Sprintf("SELECT * FROM `%s` LIMIT %d OFFSET %d", tableName, limit, offset)
        rows, err := db.Query(query)
        if err != nil {
            fmt.Printf("Error querying table %s: %v\n", tableName, err)
            return
        }

        columns, errColumns := rows.Columns()
        if errColumns != nil {
            fmt.Printf("Error getting columns for table %s: %v\n", tableName, err)
            rows.Close()
            return
        }

        values := make([]sql.RawBytes, len(columns))
        scanArgs := make([]interface{}, len(values))
        for i := range values {
            scanArgs[i] = &values[i]
        }

        rowCount := 0
        for rows.Next() {
            err = rows.Scan(scanArgs...)
            if err != nil {
                fmt.Printf("Error scanning row for table %s: %v\n", tableName, err)
                rows.Close()
                return
            }

            var valueStrings []string
            for _, value := range values {
                // 处理特殊字符转义,防止SQL注入
                escapedValue := strings.Replace(string(value), "'", "''", -1)
                valueStrings = append(valueStrings, "'"+escapedValue+"'")
            }
            insertStmt := fmt.Sprintf("INSERT INTO `%s` (`%s`) VALUES (%s);\n", tableName, strings.Join(columns, "`, `"), strings.Join(valueStrings, ","))

            // 确保并发写入时不冲突
            mu.Lock()
            _, err = file.WriteString(insertStmt)
            mu.Unlock()

            if err != nil {
                fmt.Printf("Error writing to backup file for table %s: %v\n", tableName, err)
                rows.Close()
                return
            }
            rowCount++
        }
        rows.Close()

        if rowCount < limit {
            break // 已经到达表的末尾
        }

        offset += limit
    }

    // 在表数据结束后添加空行分隔
    mu.Lock()
    file.WriteString("\n")
    mu.Unlock()

    fmt.Printf("Backup completed for table %s (structure + data)\n", tableName)
}

/*
备份表结构
*/
func backupTableStructure(db *sql.DB, tableName string, file *os.File) error {
    // 获取表结构
    query := fmt.Sprintf("SHOW CREATE TABLE `%s`", tableName)
    var createTableSQL string
    var tableNameResult string
    err := db.QueryRow(query).Scan(&tableNameResult, &createTableSQL)
    if err != nil {
        return fmt.Errorf("Error getting structure for table %s: %v", tableName, err)
    }

    // 写入表结构到文件
    mu.Lock()
    defer mu.Unlock()

    // 添加删除表的语句(如果已存在)
    dropTableStmt := fmt.Sprintf("DROP TABLE IF EXISTS `%s`;\n", tableName)
    _, err = file.WriteString(dropTableStmt)
    if err != nil {
        return fmt.Errorf("Error writing drop table statement for %s: %v", tableName, err)
    }

    // 写入创建表的语句
    _, err = file.WriteString(createTableSQL + ";\n\n")
    if err != nil {
        return fmt.Errorf("Error writing create table statement for %s: %v", tableName, err)
    }

    return nil
}

/*
操作程序 - 备份所有表的结构,但只备份非跳过表的数据
*/
func backupDatabase() {
    // 读取配置文件
    configFile, err := ioutil.ReadFile("config.yaml")
    if err != nil {
        log.Fatalf("Error reading config file: %v", err)
    }

    var config Config
    err = yaml.Unmarshal(configFile, &config)
    if err != nil {
        log.Fatalf("Error parsing config file: %v", err)
    }

    // 使用配置文件中的 MySQL 连接信息
    dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", config.MySQL.User, config.MySQL.Password, config.MySQL.Host, config.MySQL.Port, config.MySQL.Database)
    db, errOpen := sql.Open("mysql", dsn)
    if errOpen != nil {
        panic(errOpen)
    }
    defer db.Close()

    //清空备份目录中的.sql文件
    if err := clearBackupFiles(config.LocalPath); err != nil {
        log.Fatalf("清空备份文件失败: %v", err)
    }

    //获取本地目录
    localPath := config.LocalPath
    fileName := config.MySQL.Database

    // 获取当前时间并格式化为字符串
    currentTime := time.Now().Format("2006-01-02_15-04-05")
    backupFile := fmt.Sprintf(localPath+"/"+fileName+"_%s.sql", currentTime)
    file, errCreate := os.Create(backupFile)
    if errCreate != nil {
        panic(errCreate)
    }
    defer file.Close()

    // 在文件开头添加备份信息头
    header := fmt.Sprintf("-- MySQL Backup\n-- Database: %s\n-- Backup Time: %s\n-- Skip Tables Data: %v\n\n",
        config.MySQL.Database, time.Now().Format("2006-01-02 15:04:05"), config.SkipTables)
    file.WriteString(header)

    tables, errQuery := db.Query("SHOW TABLES")
    if errQuery != nil {
        panic(errQuery)
    }
    defer tables.Close()

    var wg sync.WaitGroup
    sem := make(chan struct{}, 5) // 控制并发量为5

    var tableName string
    for tables.Next() {
        if err := tables.Scan(&tableName); err != nil {
            panic(err)
        }

        wg.Add(1)
        // 检查是否在跳过的表列表中,如果存在则只备份结构不备份数据
        skipData := contains(config.SkipTables, tableName)
        go backupTable(db, tableName, &wg, sem, file, skipData)
    }

    wg.Wait() // 等待所有的 goroutine 完成

    //获取当前执行目录
    dir, _ := os.Getwd()
    // 获取文件信息
    uploadFilePath := dir + "/" + backupFile
    uploadFileName := strings.Replace(uploadFilePath, dir+"/"+localPath+"/", "", -1)

    //上传至腾讯云Cos
    if config.Save == "cos" {
        saveToCos(uploadFilePath, uploadFileName, config)
    }

    //上传至七牛云
    if config.Save == "qiNiu" {
        saveQiNiu(uploadFilePath, uploadFileName, config)
    }

    //输出
    println("备份完成")
}

// 检查字符串是否在字符串切片中
func contains(slice []string, item string) bool {
    for _, s := range slice {
        if s == item {
            return true
        }
    }
    return false
}

// 读取配置文件
func readConfig() Config {
    configFile, err := ioutil.ReadFile("config.yaml")
    if err != nil {
        log.Fatalf("Error reading config file: %v", err)
    }

    var config Config
    err = yaml.Unmarshal(configFile, &config)
    if err != nil {
        log.Fatalf("Error parsing config file: %v", err)
    }

    return config
}

// 保存到腾讯云Cos
func saveToCos(uploadFilePath, uploadFileName string, config Config) int {
    // 判断文件是否存在
    if _, err := os.Stat(uploadFilePath); os.IsNotExist(err) {
        fmt.Println("【腾讯云cos】本地文件不存在:", uploadFilePath)
        return 404
    }
    //设置请求
    u, _ := url.Parse(config.Cos.Region)
    b := &cos.BaseURL{BucketURL: u}
    client := cos.NewClient(b, &http.Client{
        Transport: &cos.AuthorizationTransport{
            // 通过环境变量获取密钥
            // 环境变量 SECRETID 表示用户的 SecretId,登录访问管理控制台查看密钥,https://console.cloud.tencent.com/cam/capi
            SecretID: config.Cos.SecretID, // 用户的 SecretId,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
            // 环境变量 SECRETKEY 表示用户的 SecretKey,登录访问管理控制台查看密钥,https://console.cloud.tencent.com/cam/capi
            SecretKey: config.Cos.SecretKey, // 用户的 SecretKey,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
        },
    })
    //组合存储路径
    key := config.Cos.Catalogue + "/" + config.MySQL.Database + "/" + uploadFileName
    _, _, err := client.Object.Upload(
        context.Background(), key, uploadFilePath, nil,
    )
    //判断上传
    if err != nil {
        fmt.Println("【腾讯云cos】上传失败:", err)
        return 404
    }
    // 删除文件
    err = os.Remove(uploadFilePath)
    if err != nil {
        log.Printf("【腾讯云cos】删除文件失败: %v", err)
    }
    //返回
    fmt.Println("【腾讯云cos】上传成功")
    return 200
}

// 七牛云上传文件
func saveQiNiu(uploadFilePath, uploadFileName string, config Config) int {
    // 判断文件是否存在
    if _, err := os.Stat(uploadFilePath); os.IsNotExist(err) {
        fmt.Println("【七牛云】本地文件不存在:", uploadFilePath)
        return 404
    }
    // 设置存储桶
    putPolicy := storage.PutPolicy{
        Scope: config.QiNiu.Bucket,
    }

    // 设置权限
    mac := auth.New(config.QiNiu.AccessKey, config.QiNiu.SecretKey)
    upToken := putPolicy.UploadToken(mac)

    // 根据配置的区域代码设置Zone
    zone := getQiNiuZoneByRegion(config.QiNiu.Region)

    cfg := storage.Config{
        Zone:          zone,
        UseCdnDomains: false,
        UseHTTPS:      false,
    }

    formUploader := storage.NewFormUploader(&cfg)

    // 构建上传key
    key := fmt.Sprintf("%s/%s/%s",
        strings.Trim(config.QiNiu.Catalogue, "/"),
        strings.Trim(config.MySQL.Database, "/"),
        uploadFileName)

    ret := storage.PutRet{}
    err := formUploader.PutFile(context.Background(), &ret, upToken, key, uploadFilePath, nil)

    if err != nil {
        fmt.Printf("【七牛云】上传失败:%v\n", err)
        fmt.Printf("【调试信息】Bucket: %s, Key: %s, 区域: %s\n", config.QiNiu.Bucket, key, config.QiNiu.Region)
        return 404
    } else {
        fmt.Printf("【七牛云】上传成功!文件Key: %s, 区域: %s\n", ret.Key, config.QiNiu.Region)
        _ = os.Remove(uploadFilePath)
        return 200
    }
}

// 根据区域代码获取对应的Zone【七牛云】
func getQiNiuZoneByRegion(region string) *storage.Zone {
    switch region {
    case "z0": // 华东
        return &storage.ZoneHuadong
    case "z1": // 华北
        return &storage.ZoneHuabei
    case "z2": // 华南
        return &storage.ZoneHuanan
    case "na0": // 北美
        return &storage.ZoneBeimei
    case "as0": // 东南亚
        return &storage.ZoneXinjiapo
    default: // 默认华南
        return &storage.ZoneHuanan
    }
}

/*
检测本地目录
*/
func checkDirectoryPermissions(path string) error {
    //判断是否为空
    if path == "" {
        return fmt.Errorf("请配置本地保存目录")
    }

    // 检查读权限
    dir, err := os.Open(path)
    if err != nil {
        return fmt.Errorf("无法打开目录: %v", err)
    }

    //关闭连接
    defer dir.Close()

    //返回
    return nil
}

/**
清空备份目录中的.sql文件
 */
func clearBackupFiles(localPath string) error {
    // 读取目录中的所有文件
    files, err := ioutil.ReadDir(localPath)
    if err != nil {
        return fmt.Errorf("读取目录失败: %v", err)
    }

    // 遍历文件,删除.sql结尾的文件
    for _, file := range files {
        if !file.IsDir() && strings.HasSuffix(file.Name(), ".sql") {
            filePath := filepath.Join(localPath, file.Name())
            err := os.Remove(filePath)
            if err != nil {
                return fmt.Errorf("删除文件 %s 失败: %v", file.Name(), err)
            }
            fmt.Printf("已删除备份文件: %s\n", file.Name())
        }
    }

    fmt.Println("备份目录已清空")
    return nil
}

// 主程序
func main() {
    //获取数据配置
    config := readConfig()

    //判断是否存在本地保存目录
    if err := checkDirectoryPermissions(config.LocalPath); err != nil {
        log.Fatalf("请检查本地保存目录")
    }

    // 创建cron调度器
    c := cron.New(cron.WithSeconds()) // 支持秒级精度

    // 添加多个定时任务
    for i, cronExpr := range config.ScheduleTime {
        // 使用闭包捕获当前循环变量
        expr := cronExpr
        _, err := c.AddFunc(expr, func() {
            fmt.Printf("Cron job %d triggered by expression: %s\n", i+1, expr)
            backupDatabase()
        })
        if err != nil {
            log.Fatalf("Failed to add cron job %s: %v", cronExpr, err)
        }
        fmt.Printf("Added cron job %d: %s\n", i+1, cronExpr)
    }

    // 启动cron调度器
    c.Start()

    fmt.Printf("Total %d cron jobs started\n", len(config.ScheduleTime))
    fmt.Println("Press Ctrl+C to exit")

    // 保持程序运行
    select {}
}

我们执行这个命令就可以直接运行脚本

go run .

这样就可以根据我们配置文件里面设定的配置进行数据备份。       

0条评论

发表评论