Skip to content

Commit b718bf6

Browse files
author
Shlomi Noach
authored
Merge pull request #283 from github/validate-table-master-replica
validating table structure on applier and migrator
2 parents 8fbff65 + bf92eec commit b718bf6

6 files changed

Lines changed: 63 additions & 39 deletions

File tree

build.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#
33
#
44

5-
RELEASE_VERSION="1.0.23"
5+
RELEASE_VERSION="1.0.26"
66

77
function build {
88
osname=$1

go/base/context.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ type MigrationContext struct {
147147
UserCommandedUnpostponeFlag int64
148148
PanicAbort chan error
149149

150+
OriginalTableColumnsOnApplier *sql.ColumnList
150151
OriginalTableColumns *sql.ColumnList
151152
OriginalTableUniqueKeys [](*sql.UniqueKey)
152153
GhostTableColumns *sql.ColumnList

go/logic/applier.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ func (this *Applier) InitDBConnections() (err error) {
6767
} else {
6868
this.connectionConfig.ImpliedKey = impliedKey
6969
}
70+
if err := this.readTableColumns(); err != nil {
71+
return err
72+
}
7073
return nil
7174
}
7275

@@ -95,6 +98,16 @@ func (this *Applier) validateAndReadTimeZone() error {
9598
return nil
9699
}
97100

101+
// readTableColumns reads table columns on applier
102+
func (this *Applier) readTableColumns() (err error) {
103+
log.Infof("Examining table structure on applier")
104+
this.migrationContext.OriginalTableColumnsOnApplier, err = mysql.GetTableColumns(this.db, this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName)
105+
if err != nil {
106+
return err
107+
}
108+
return nil
109+
}
110+
98111
// showTableStatus returns the output of `show table status like '...'` command
99112
func (this *Applier) showTableStatus(tableName string) (rowMap sqlutils.RowMap) {
100113
rowMap = nil

go/logic/inspect.go

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package logic
88
import (
99
gosql "database/sql"
1010
"fmt"
11+
"reflect"
1112
"strings"
1213
"sync/atomic"
1314

@@ -83,7 +84,7 @@ func (this *Inspector) InspectTableColumnsAndUniqueKeys(tableName string) (colum
8384
if len(uniqueKeys) == 0 {
8485
return columns, uniqueKeys, fmt.Errorf("No PRIMARY nor UNIQUE key found in table! Bailing out")
8586
}
86-
columns, err = this.getTableColumns(this.migrationContext.DatabaseName, tableName)
87+
columns, err = mysql.GetTableColumns(this.db, this.migrationContext.DatabaseName, tableName)
8788
if err != nil {
8889
return columns, uniqueKeys, err
8990
}
@@ -99,9 +100,15 @@ func (this *Inspector) InspectOriginalTable() (err error) {
99100
return nil
100101
}
101102

102-
// InspectOriginalAndGhostTables compares original and ghost tables to see whether the migration
103+
// inspectOriginalAndGhostTables compares original and ghost tables to see whether the migration
103104
// makes sense and is valid. It extracts the list of shared columns and the chosen migration unique key
104-
func (this *Inspector) InspectOriginalAndGhostTables() (err error) {
105+
func (this *Inspector) inspectOriginalAndGhostTables() (err error) {
106+
originalNamesOnApplier := this.migrationContext.OriginalTableColumnsOnApplier.Names()
107+
originalNames := this.migrationContext.OriginalTableColumns.Names()
108+
if !reflect.DeepEqual(originalNames, originalNamesOnApplier) {
109+
return fmt.Errorf("It seems like table structure is not identical between master and replica. This scenario is not supported.")
110+
}
111+
105112
this.migrationContext.GhostTableColumns, this.migrationContext.GhostTableUniqueKeys, err = this.InspectTableColumnsAndUniqueKeys(this.migrationContext.GetGhostTableName())
106113
if err != nil {
107114
return err
@@ -478,31 +485,6 @@ func (this *Inspector) CountTableRows() error {
478485
return nil
479486
}
480487

481-
// getTableColumns reads column list from given table
482-
func (this *Inspector) getTableColumns(databaseName, tableName string) (*sql.ColumnList, error) {
483-
query := fmt.Sprintf(`
484-
show columns from %s.%s
485-
`,
486-
sql.EscapeName(databaseName),
487-
sql.EscapeName(tableName),
488-
)
489-
columnNames := []string{}
490-
err := sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error {
491-
columnNames = append(columnNames, rowMap.GetString("Field"))
492-
return nil
493-
})
494-
if err != nil {
495-
return nil, err
496-
}
497-
if len(columnNames) == 0 {
498-
return nil, log.Errorf("Found 0 columns on %s.%s. Bailing out",
499-
sql.EscapeName(databaseName),
500-
sql.EscapeName(tableName),
501-
)
502-
}
503-
return sql.NewColumnList(columnNames), nil
504-
}
505-
506488
// applyColumnTypes
507489
func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsLists ...*sql.ColumnList) error {
508490
query := `

go/logic/migrator.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
type ChangelogState string
2727

2828
const (
29-
TablesInPlace ChangelogState = "TablesInPlace"
29+
GhostTableMigrated ChangelogState = "GhostTableMigrated"
3030
AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed"
3131
)
3232

@@ -58,7 +58,7 @@ type Migrator struct {
5858
migrationContext *base.MigrationContext
5959

6060
firstThrottlingCollected chan bool
61-
tablesInPlace chan bool
61+
ghostTableMigrated chan bool
6262
rowCopyComplete chan bool
6363
allEventsUpToLockProcessed chan bool
6464

@@ -76,7 +76,7 @@ func NewMigrator() *Migrator {
7676
migrator := &Migrator{
7777
migrationContext: base.GetMigrationContext(),
7878
parser: sql.NewParser(),
79-
tablesInPlace: make(chan bool),
79+
ghostTableMigrated: make(chan bool),
8080
firstThrottlingCollected: make(chan bool, 1),
8181
rowCopyComplete: make(chan bool),
8282
allEventsUpToLockProcessed: make(chan bool),
@@ -182,9 +182,9 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
182182
}
183183
changelogState := ChangelogState(dmlEvent.NewColumnValues.StringColumn(3))
184184
switch changelogState {
185-
case TablesInPlace:
185+
case GhostTableMigrated:
186186
{
187-
this.tablesInPlace <- true
187+
this.ghostTableMigrated <- true
188188
}
189189
case AllEventsUpToLockProcessed:
190190
{
@@ -291,14 +291,14 @@ func (this *Migrator) Migrate() (err error) {
291291
return err
292292
}
293293

294-
log.Infof("Waiting for tables to be in place")
295-
<-this.tablesInPlace
296-
log.Debugf("Tables are in place")
294+
log.Infof("Waiting for ghost table to be migrated")
295+
<-this.ghostTableMigrated
296+
log.Debugf("ghost table migrated")
297297
// Yay! We now know the Ghost and Changelog tables are good to examine!
298298
// When running on replica, this means the replica has those tables. When running
299299
// on master this is always true, of course, and yet it also implies this knowledge
300300
// is in the binlogs.
301-
if err := this.inspector.InspectOriginalAndGhostTables(); err != nil {
301+
if err := this.inspector.inspectOriginalAndGhostTables(); err != nil {
302302
return err
303303
}
304304
// Validation complete! We're good to execute this migration
@@ -926,12 +926,13 @@ func (this *Migrator) initiateApplier() error {
926926
log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out")
927927
return err
928928
}
929+
929930
if err := this.applier.AlterGhost(); err != nil {
930931
log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out")
931932
return err
932933
}
933934

934-
this.applier.WriteChangelogState(string(TablesInPlace))
935+
this.applier.WriteChangelogState(string(GhostTableMigrated))
935936
go this.applier.InitiateHeartbeat()
936937
return nil
937938
}

go/mysql/utils.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"fmt"
1111
"time"
1212

13+
"github.com/github/gh-ost/go/sql"
14+
1315
"github.com/outbrain/golib/log"
1416
"github.com/outbrain/golib/sqlutils"
1517
)
@@ -149,3 +151,28 @@ func GetInstanceKey(db *gosql.DB) (instanceKey *InstanceKey, err error) {
149151
err = db.QueryRow(`select @@global.hostname, @@global.port`).Scan(&instanceKey.Hostname, &instanceKey.Port)
150152
return instanceKey, err
151153
}
154+
155+
// GetTableColumns reads column list from given table
156+
func GetTableColumns(db *gosql.DB, databaseName, tableName string) (*sql.ColumnList, error) {
157+
query := fmt.Sprintf(`
158+
show columns from %s.%s
159+
`,
160+
sql.EscapeName(databaseName),
161+
sql.EscapeName(tableName),
162+
)
163+
columnNames := []string{}
164+
err := sqlutils.QueryRowsMap(db, query, func(rowMap sqlutils.RowMap) error {
165+
columnNames = append(columnNames, rowMap.GetString("Field"))
166+
return nil
167+
})
168+
if err != nil {
169+
return nil, err
170+
}
171+
if len(columnNames) == 0 {
172+
return nil, log.Errorf("Found 0 columns on %s.%s. Bailing out",
173+
sql.EscapeName(databaseName),
174+
sql.EscapeName(tableName),
175+
)
176+
}
177+
return sql.NewColumnList(columnNames), nil
178+
}

0 commit comments

Comments
 (0)