package main import ( "database/sql" "encoding/json" "flag" "fmt" "log" "os" "strings" "time" _ "github.com/go-sql-driver/mysql" ) type duplicateGroup struct { OwnerUserID int64 `json:"owner_user_id"` Count int64 `json:"count"` } type subscriptionRow struct { ID int64 `json:"id"` UserID int64 `json:"user_id"` OrderID int64 `json:"order_id"` SubscribeID int64 `json:"subscribe_id"` ExpireTime time.Time `json:"expire_time"` Traffic int64 `json:"traffic"` Download int64 `json:"download"` Upload int64 `json:"upload"` ExpiredDownload int64 `json:"expired_download"` ExpiredUpload int64 `json:"expired_upload"` Status uint8 `json:"status"` UpdatedAt time.Time `json:"updated_at"` } type mergePlan struct { OwnerUserID int64 `json:"owner_user_id"` Keep subscriptionRow `json:"keep"` Merge []subscriptionRow `json:"merge"` } func main() { dsn := flag.String("dsn", os.Getenv("PPANEL_MYSQL_DSN"), "MySQL DSN; defaults to PPANEL_MYSQL_DSN") execute := flag.Bool("execute", false, "apply changes; default is dry-run") flag.Parse() if strings.TrimSpace(*dsn) == "" { log.Fatal("missing DSN: pass -dsn or set PPANEL_MYSQL_DSN") } db, err := sql.Open("mysql", *dsn) if err != nil { log.Fatal(err) } defer db.Close() groups, err := findDuplicateGroups(db) if err != nil { log.Fatal(err) } plans := make([]mergePlan, 0, len(groups)) for _, group := range groups { plan, err := buildPlan(db, group.OwnerUserID) if err != nil { log.Fatal(err) } if len(plan.Merge) > 0 { plans = append(plans, plan) } } enc := json.NewEncoder(os.Stdout) enc.SetIndent("", " ") if err := enc.Encode(plans); err != nil { log.Fatal(err) } if !*execute { fmt.Fprintf(os.Stderr, "dry-run only: %d duplicate owner groups found\n", len(plans)) return } for _, plan := range plans { if err := applyPlan(db, plan); err != nil { log.Fatal(err) } } fmt.Fprintf(os.Stderr, "merged %d duplicate owner groups\n", len(plans)) } func findDuplicateGroups(db *sql.DB) ([]duplicateGroup, error) { rows, err := db.Query(` SELECT owner_user_id, COUNT(1) AS cnt FROM ( SELECT us.id, COALESCE(uf.owner_user_id, us.user_id) AS owner_user_id FROM user_subscribe us LEFT JOIN user_family_member ufm ON ufm.user_id = us.user_id AND ufm.deleted_at IS NULL AND ufm.status = 1 LEFT JOIN user_family uf ON uf.id = ufm.family_id AND uf.deleted_at IS NULL AND uf.status = 1 WHERE us.token <> '' AND us.status IN (0, 1, 2, 3, 4) ) scoped GROUP BY owner_user_id HAVING COUNT(1) > 1 ORDER BY owner_user_id`) if err != nil { return nil, err } defer rows.Close() var groups []duplicateGroup for rows.Next() { var g duplicateGroup if err := rows.Scan(&g.OwnerUserID, &g.Count); err != nil { return nil, err } groups = append(groups, g) } return groups, rows.Err() } func buildPlan(db *sql.DB, ownerUserID int64) (mergePlan, error) { rows, err := db.Query(` SELECT us.id, us.user_id, us.order_id, us.subscribe_id, us.expire_time, us.traffic, us.download, us.upload, us.expired_download, us.expired_upload, us.status, us.updated_at FROM user_subscribe us LEFT JOIN user_family_member ufm ON ufm.user_id = us.user_id AND ufm.deleted_at IS NULL AND ufm.status = 1 LEFT JOIN user_family uf ON uf.id = ufm.family_id AND uf.deleted_at IS NULL AND uf.status = 1 WHERE COALESCE(uf.owner_user_id, us.user_id) = ? AND us.token <> '' AND us.status IN (0, 1, 2, 3, 4) ORDER BY us.expire_time DESC, us.updated_at DESC, us.id DESC`, ownerUserID) if err != nil { return mergePlan{}, err } defer rows.Close() var all []subscriptionRow for rows.Next() { var r subscriptionRow if err := rows.Scan(&r.ID, &r.UserID, &r.OrderID, &r.SubscribeID, &r.ExpireTime, &r.Traffic, &r.Download, &r.Upload, &r.ExpiredDownload, &r.ExpiredUpload, &r.Status, &r.UpdatedAt); err != nil { return mergePlan{}, err } all = append(all, r) } if err := rows.Err(); err != nil { return mergePlan{}, err } if len(all) == 0 { return mergePlan{OwnerUserID: ownerUserID}, nil } keep := all[0] for _, r := range all[1:] { keep.Download += r.Download keep.Upload += r.Upload keep.ExpiredDownload += r.ExpiredDownload keep.ExpiredUpload += r.ExpiredUpload if r.Traffic > keep.Traffic { keep.Traffic = r.Traffic } } for _, r := range all { if r.UpdatedAt.After(keep.UpdatedAt) { keep.SubscribeID = r.SubscribeID } } return mergePlan{OwnerUserID: ownerUserID, Keep: keep, Merge: all[1:]}, nil } func applyPlan(db *sql.DB, plan mergePlan) error { tx, err := db.Begin() if err != nil { return err } defer tx.Rollback() if _, err = tx.Exec(` UPDATE user_subscribe SET user_id = ?, subscribe_id = ?, traffic = ?, download = ?, upload = ?, expired_download = ?, expired_upload = ?, status = 1, note = CONCAT(COALESCE(note, ''), ' [merged duplicate subscriptions]') WHERE id = ?`, plan.OwnerUserID, plan.Keep.SubscribeID, plan.Keep.Traffic, plan.Keep.Download, plan.Keep.Upload, plan.Keep.ExpiredDownload, plan.Keep.ExpiredUpload, plan.Keep.ID); err != nil { return err } for _, r := range plan.Merge { if _, err = tx.Exec(` UPDATE user_subscribe SET status = 5, note = CONCAT(COALESCE(note, ''), ' [merged into subscription #', ?, ']') WHERE id = ?`, plan.Keep.ID, r.ID); err != nil { return err } } return tx.Commit() }