diff --git a/README-zh.md b/README-zh.md index d06f4f8..b27a40f 100644 --- a/README-zh.md +++ b/README-zh.md @@ -20,6 +20,8 @@ golang 实现的零依赖、支持所有类型、高性能、并发 mysqldump * 自定义 Writer: 如本地文件、多文件储存、远程服务器、云存储等。(默认控制台输出)。 * 支持所有 MYSQL 数据类型. * 支持 INSERT Merge, 大幅提升数据恢复性能 +* 导出支持 批量插入, 大幅提升数据恢复性能 +* 导出支持 触发器 ## QuickStart diff --git a/README.md b/README.md index feb9a7a..a711a0a 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,8 @@ A zero-dependency,all data types are supported, high-performance, concurrent mys * Supports custom Writer: data can be written to any Writer, such as local files, multiple file storage, remote servers, cloud storage, etc. (default console output). * Supports all MySQL data types QuickStart. * Support Merge Insert Option in Source Greatly improve data recovery performance - +* Support multy data in one insert +* Support dump table trigger ## QuickStart diff --git a/go.mod b/go.mod index fa0a6f3..d512b17 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/jarvanstack/mysqldump +module github.com/songdemei/mysqldump go 1.18 diff --git a/mysqldump.go b/mysqldump.go index ede4e49..61470a3 100644 --- a/mysqldump.go +++ b/mysqldump.go @@ -18,6 +18,8 @@ func init() { log.SetFlags(log.Lshortfile | log.LstdFlags) } +var version string = "v0.11.0" + type dumpOption struct { // 导出表数据 isData bool @@ -31,11 +33,27 @@ type dumpOption struct { isAllTable bool // 是否删除表 isDropTable bool + // 是否增加选库脚本,多库导出时,此设置默认开启 + isUseDb bool + + //批量插入,提高导出效率 + perDataNumber int // writer 默认为 os.Stdout writer io.Writer + //是否输出日志 + logOut bool +} +type triggerStruct struct { + Trigger string + Event string + Table string + Statement string + Timing string } +var allTriggers map[string][]triggerStruct + type DumpOption func(*dumpOption) // 删除表 @@ -59,6 +77,13 @@ func WithAllDatabases() DumpOption { } } +// 是否增加指定库语句 如果多库,此设置无效 +func WithUseDb() DumpOption { + return func(option *dumpOption) { + option.isUseDb = true + } +} + // 导出指定数据库, 与 WithAllDatabases 互斥, WithAllDatabases 优先级高 func WithDBs(databases ...string) DumpOption { return func(option *dumpOption) { @@ -80,6 +105,13 @@ func WithAllTable() DumpOption { } } +// 批量insert +func WithMultyInsert(num int) DumpOption { + return func(option *dumpOption) { + option.perDataNumber = num + } +} + // 导出到指定 writer func WithWriter(writer io.Writer) DumpOption { return func(option *dumpOption) { @@ -87,20 +119,33 @@ func WithWriter(writer io.Writer) DumpOption { } } +// 是否输出日志 +// @TODO: 后续增加日志的handle用于输出到其他地方 +func WithLogOut(logOut bool) DumpOption { + return func(option *dumpOption) { + option.logOut = logOut + } +} + func Dump(dns string, opts ...DumpOption) error { + + var err error + + var o dumpOption // 打印开始 start := time.Now() - log.Printf("[info] [dump] start at %s\n", start.Format("2006-01-02 15:04:05")) + if o.logOut { + log.Printf("[info] [dump] start at %s\n", start.Format("2006-01-02 15:04:05")) + } + // 打印结束 defer func() { end := time.Now() - log.Printf("[info] [dump] end at %s, cost %s\n", end.Format("2006-01-02 15:04:05"), end.Sub(start)) + if o.logOut { + log.Printf("[info] [dump] end at %s, cost %s\n", end.Format("2006-01-02 15:04:05"), end.Sub(start)) + } }() - var err error - - var o dumpOption - for _, opt := range opts { opt(&o) } @@ -116,7 +161,6 @@ func Dump(dns string, opts ...DumpOption) error { dbName, } } - if len(o.tables) == 0 { // 默认包含全部表 o.isAllTable = true @@ -133,14 +177,17 @@ func Dump(dns string, opts ...DumpOption) error { // 打印 Header buf.WriteString("-- ----------------------------\n") buf.WriteString("-- MySQL Database Dump\n") + buf.WriteString("-- GoMysqlDump version: " + version + "\n") buf.WriteString("-- Start Time: " + start.Format("2006-01-02 15:04:05") + "\n") buf.WriteString("-- ----------------------------\n") buf.WriteString("\n\n") - + buf.WriteString("/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;\n") // 连接数据库 db, err := sql.Open("mysql", dns) if err != nil { - log.Printf("[error] %v \n", err) + if o.logOut { + log.Printf("[error] %v \n", err) + } return err } defer db.Close() @@ -150,18 +197,24 @@ func Dump(dns string, opts ...DumpOption) error { if o.isAllDB { dbs, err = getDBs(db) if err != nil { - log.Printf("[error] %v \n", err) + if o.logOut { + log.Printf("[error] %v \n", err) + } return err } } else { dbs = o.dbs } - + if len(dbs) > 1 { + o.isUseDb = true + } // 2. 获取表 for _, dbStr := range dbs { _, err = db.Exec(fmt.Sprintf("USE `%s`", dbStr)) if err != nil { - log.Printf("[error] %v \n", err) + if o.logOut { + log.Printf("[error] %v \n", err) + } return err } @@ -169,38 +222,75 @@ func Dump(dns string, opts ...DumpOption) error { if o.isAllTable { tmp, err := getAllTables(db) if err != nil { - log.Printf("[error] %v \n", err) + if o.logOut { + log.Printf("[error] %v \n", err) + } return err } tables = tmp } else { tables = o.tables } - - buf.WriteString(fmt.Sprintf("USE `%s`;\n", dbStr)) + if o.isUseDb { + //多库导出时,才会增加选库操作,否则不加选库操作 + buf.WriteString(fmt.Sprintf("USE `%s`;\n", dbStr)) + } // 3. 导出表 for _, table := range tables { - // 删除表 - if o.isDropTable { - buf.WriteString(fmt.Sprintf("DROP TABLE IF EXISTS `%s`;\n", table)) - } - // 导出表结构 - err = writeTableStruct(db, table, buf) + tt, err := getTableType(db, table) if err != nil { - log.Printf("[error] %v \n", err) return err } - // 导出表数据 - if o.isData { - err = writeTableData(db, table, buf) + if tt == "TABLE" { + // 删除表 + if o.isDropTable { + buf.WriteString(fmt.Sprintf("DROP TABLE IF EXISTS `%s`;\n", table)) + } + + // 导出表结构 + err = writeTableStruct(db, table, buf) if err != nil { - log.Printf("[error] %v \n", err) + if o.logOut { + log.Printf("[error] %v \n", err) + } + return err + } + // 导出表数据 + if o.isData { + err = writeTableData(db, table, buf, o.perDataNumber) + if err != nil { + if o.logOut { + log.Printf("[error] %v \n", err) + } + return err + } + } + err := writeTableTrigger(db, table, buf) + if err != nil { + if o.logOut { + log.Printf("[error] %v \n", err) + } return err } } + if tt == "VIEW" { + // 删除视图 + if o.isDropTable { + buf.WriteString(fmt.Sprintf("DROP VIEW IF EXISTS `%s`;\n", table)) + } + // 导出视图结构 + err = writeViewStruct(db, table, buf) + if err != nil { + if o.logOut { + log.Printf("[error] %v \n", err) + } + return err + } + } + } } @@ -215,9 +305,27 @@ func Dump(dns string, opts ...DumpOption) error { return nil } +func getTableType(db *sql.DB, table string) (t string, err error) { + query := fmt.Sprintf("SELECT TABLE_TYPE FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = '%s'", table) + var tableType string + err = db.QueryRow(query).Scan(&tableType) + if err != nil { + return "", err + } + switch tableType { + case "BASE TABLE": + return "TABLE", nil + case "VIEW": + return "VIEW", nil + default: + return "", nil + } +} func getCreateTableSQL(db *sql.DB, table string) (string, error) { + var createTableSQL string + err := db.QueryRow(fmt.Sprintf("SHOW CREATE TABLE `%s`", table)).Scan(&table, &createTableSQL) if err != nil { return "", err @@ -273,7 +381,6 @@ func writeTableStruct(db *sql.DB, table string, buf *bufio.Writer) error { createTableSQL, err := getCreateTableSQL(db, table) if err != nil { - log.Printf("[error] %v \n", err) return err } buf.WriteString(createTableSQL) @@ -284,16 +391,38 @@ func writeTableStruct(db *sql.DB, table string, buf *bufio.Writer) error { return nil } -func writeTableData(db *sql.DB, table string, buf *bufio.Writer) error { +func writeViewStruct(db *sql.DB, table string, buf *bufio.Writer) error { + // 导出视图 + buf.WriteString("-- ----------------------------\n") + buf.WriteString(fmt.Sprintf("-- View structure for %s\n", table)) + buf.WriteString("-- ----------------------------\n") + + var createTableSQL string + var charact string + var connect string + err := db.QueryRow(fmt.Sprintf("SHOW CREATE TABLE `%s`", table)).Scan(&table, &createTableSQL, &charact, &connect) + if err != nil { + return err + } + buf.WriteString(createTableSQL) + buf.WriteString(";") + + buf.WriteString("\n\n") + buf.WriteString("\n\n") + return nil +} + +func writeTableData(db *sql.DB, table string, buf *bufio.Writer, perDataNumber int) error { // 导出表数据 buf.WriteString("-- ----------------------------\n") buf.WriteString(fmt.Sprintf("-- Records of %s\n", table)) buf.WriteString("-- ----------------------------\n") + buf.WriteString(fmt.Sprintf("LOCK TABLES `%s` WRITE;\n", table)) + buf.WriteString(fmt.Sprintf("/*!40000 ALTER TABLE `%s` DISABLE KEYS */;\n", table)) lineRows, err := db.Query(fmt.Sprintf("SELECT * FROM `%s`", table)) if err != nil { - log.Printf("[error] %v \n", err) return err } defer lineRows.Close() @@ -301,17 +430,28 @@ func writeTableData(db *sql.DB, table string, buf *bufio.Writer) error { var columns []string columns, err = lineRows.Columns() if err != nil { - log.Printf("[error] %v \n", err) return err } columnTypes, err := lineRows.ColumnTypes() if err != nil { - log.Printf("[error] %v \n", err) return err } var values [][]interface{} + rowId := 0 + for lineRows.Next() { + ssql := "" + if rowId == 0 || perDataNumber < 2 || rowId%perDataNumber == 0 { + if rowId > 0 { + ssql = ";\n" + } + //表结构 + ssql += "INSERT INTO `" + table + "` (`" + strings.Join(columns, "`,`") + "`) VALUES \n" + } else { + buf.WriteString(",\n") + } + row := make([]interface{}, len(columns)) rowPointers := make([]interface{}, len(columns)) for i := range columns { @@ -319,102 +459,173 @@ func writeTableData(db *sql.DB, table string, buf *bufio.Writer) error { } err = lineRows.Scan(rowPointers...) if err != nil { - log.Printf("[error] %v \n", err) return err } + rowString, err := buildRowData(row, columnTypes) + if err != nil { + return err + } + ssql += "(" + rowString + ")" + rowId += 1 + buf.WriteString(ssql) values = append(values, row) } + buf.WriteString(";\n") + buf.WriteString(fmt.Sprintf("/*!40000 ALTER TABLE `%s` ENABLE KEYS */;\n", table)) + buf.WriteString("UNLOCK TABLES;\n\n") + return nil - for _, row := range values { - ssql := "INSERT INTO `" + table + "` VALUES (" - - for i, col := range row { - if col == nil { - ssql += "NULL" - } else { - Type := columnTypes[i].DatabaseTypeName() - // 去除 UNSIGNED 和空格 - Type = strings.Replace(Type, "UNSIGNED", "", -1) - Type = strings.Replace(Type, " ", "", -1) - switch Type { - case "TINYINT", "SMALLINT", "MEDIUMINT", "INT", "INTEGER", "BIGINT": - if bs, ok := col.([]byte); ok { - ssql += fmt.Sprintf("%s", string(bs)) - } else { - ssql += fmt.Sprintf("%d", col) - } - case "FLOAT", "DOUBLE": - if bs, ok := col.([]byte); ok { - ssql += fmt.Sprintf("%s", string(bs)) - } else { - ssql += fmt.Sprintf("%f", col) - } - case "DECIMAL", "DEC": - ssql += fmt.Sprintf("%s", col) +} - case "DATE": - t, ok := col.(time.Time) - if !ok { - log.Println("DATE 类型转换错误") - return err - } - ssql += fmt.Sprintf("'%s'", t.Format("2006-01-02")) - case "DATETIME": - t, ok := col.(time.Time) - if !ok { - log.Println("DATETIME 类型转换错误") - return err - } - ssql += fmt.Sprintf("'%s'", t.Format("2006-01-02 15:04:05")) - case "TIMESTAMP": - t, ok := col.(time.Time) - if !ok { - log.Println("TIMESTAMP 类型转换错误") - return err - } - ssql += fmt.Sprintf("'%s'", t.Format("2006-01-02 15:04:05")) - case "TIME": - t, ok := col.([]byte) - if !ok { - log.Println("TIME 类型转换错误") - return err - } - ssql += fmt.Sprintf("'%s'", string(t)) - case "YEAR": - t, ok := col.([]byte) - if !ok { - log.Println("YEAR 类型转换错误") - return err - } - ssql += fmt.Sprintf("%s", string(t)) - case "CHAR", "VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT": - ssql += fmt.Sprintf("'%s'", strings.Replace(fmt.Sprintf("%s", col), "'", "''", -1)) - case "BIT", "BINARY", "VARBINARY", "TINYBLOB", "BLOB", "MEDIUMBLOB", "LONGBLOB": - ssql += fmt.Sprintf("0x%X", col) - case "ENUM", "SET": - ssql += fmt.Sprintf("'%s'", col) - case "BOOL", "BOOLEAN": - if col.(bool) { - ssql += "true" - } else { - ssql += "false" - } - case "JSON": - ssql += fmt.Sprintf("'%s'", col) - default: - // unsupported type - log.Printf("unsupported type: %s", Type) - return fmt.Errorf("unsupported type: %s", Type) +func buildRowData(row []interface{}, columnTypes []*sql.ColumnType) (ssql string, err error) { + // var ssql string + for i, col := range row { + if col == nil { + ssql += "NULL" + } else { + Type := columnTypes[i].DatabaseTypeName() + // 去除 UNSIGNED 和空格 + Type = strings.Replace(Type, "UNSIGNED", "", -1) + Type = strings.Replace(Type, " ", "", -1) + switch Type { + case "TINYINT", "SMALLINT", "MEDIUMINT", "INT", "INTEGER", "BIGINT": + if bs, ok := col.([]byte); ok { + ssql += fmt.Sprintf("%s", string(bs)) + } else { + ssql += fmt.Sprintf("%d", col) } - } - if i < len(row)-1 { - ssql += "," + case "FLOAT", "DOUBLE": + if bs, ok := col.([]byte); ok { + ssql += fmt.Sprintf("%s", string(bs)) + } else { + ssql += fmt.Sprintf("%f", col) + } + case "DECIMAL", "DEC": + ssql += fmt.Sprintf("%s", col) + + case "DATE": + t, ok := col.(time.Time) + if !ok { + return "", err + } + ssql += fmt.Sprintf("'%s'", t.Format("2006-01-02")) + case "DATETIME": + t, ok := col.(time.Time) + if !ok { + return "", err + } + ssql += fmt.Sprintf("'%s'", t.Format("2006-01-02 15:04:05")) + case "TIMESTAMP": + t, ok := col.(time.Time) + if !ok { + return "", err + } + ssql += fmt.Sprintf("'%s'", t.Format("2006-01-02 15:04:05")) + case "TIME": + t, ok := col.([]byte) + if !ok { + return "", err + } + ssql += fmt.Sprintf("'%s'", string(t)) + case "YEAR": + t, ok := col.([]byte) + if !ok { + return "", err + } + ssql += fmt.Sprintf("%s", string(t)) + case "CHAR", "VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT": + r := strings.NewReplacer("\n", "\\n", "'", "\\'", "\r", "\\r", "\"", "\\\"") + ssql += fmt.Sprintf("'%s'", r.Replace(fmt.Sprintf("%s", col))) + // ssql += fmt.Sprintf("'%s'", strings.Replace(fmt.Sprintf("%s", col), "'", "''", -1)) + case "BIT", "BINARY", "VARBINARY", "TINYBLOB", "BLOB", "MEDIUMBLOB", "LONGBLOB": + ssql += fmt.Sprintf("0x%X", col) + case "ENUM", "SET": + ssql += fmt.Sprintf("'%s'", col) + case "BOOL", "BOOLEAN": + if col.(bool) { + ssql += "true" + } else { + ssql += "false" + } + case "JSON": + ssql += fmt.Sprintf("'%s'", col) + default: + // unsupported type + return "", fmt.Errorf("unsupported type: %s", Type) } } - ssql += ");\n" - buf.WriteString(ssql) + if i < len(row)-1 { + ssql += "," + } } + return ssql, nil +} - buf.WriteString("\n\n") +func writeTableTrigger(db *sql.DB, table string, buf *bufio.Writer) error { + var sql []string + + triggers, err := getTrigger(db, table) + if err != nil { + return err + } + if len(triggers) > 0 { + sql = append(sql, "-- ----------------------------") + sql = append(sql, fmt.Sprintf("-- Dump table triggers of %s--------", table)) + sql = append(sql, "-- ----------------------------") + } + for _, v := range triggers { + sql = append(sql, "DELIMITER ;;") + sql = append(sql, "/*!50003 SET SESSION SQL_MODE=\"\" */;;") + sql = append(sql, fmt.Sprintf("/*!50003 CREATE TRIGGER `%s` %s %s ON `%s` FOR EACH ROW %s */;;", v.Trigger, v.Timing, v.Event, v.Table, v.Statement)) + sql = append(sql, "DELIMITER ;") + sql = append(sql, "/*!50003 SET SESSION SQL_MODE=@OLD_SQL_MODE */;\n") + } + buf.WriteString(strings.Join(sql, "\n")) return nil } + +func getTrigger(db *sql.DB, table string) (trigger []triggerStruct, err error) { + if allTriggers != nil { + trigger = allTriggers[table] + return trigger, nil + } else { + allTriggers = make(map[string][]triggerStruct) + } + trgs, err := db.Query("SHOW TRIGGERS") + if err != nil { + return trigger, err + } + defer trgs.Close() + + var columns []string + columns, err = trgs.Columns() + + for trgs.Next() { + trgrow := make([]interface{}, len(columns)) + rowPointers := make([]interface{}, len(columns)) + for i := range columns { + rowPointers[i] = &trgrow[i] + } + err = trgs.Scan(rowPointers...) + if err != nil { + return trigger, err + } + var trigger triggerStruct + for k, v := range trgrow { + switch columns[k] { + case "Table": + trigger.Table = fmt.Sprintf("%s", v) + case "Event": + trigger.Event = fmt.Sprintf("%s", v) + case "Trigger": + trigger.Trigger = fmt.Sprintf("%s", v) + case "Statement": + trigger.Statement = fmt.Sprintf("%s", v) + case "Timing": + trigger.Timing = fmt.Sprintf("%s", v) + } + } + allTriggers[trigger.Table] = append(allTriggers[trigger.Table], trigger) + } + return allTriggers[table], nil +}