|
1 | 1 | package core |
2 | 2 |
|
3 | 3 | import ( |
| 4 | + "errors" |
4 | 5 | "fmt" |
5 | 6 | "log" |
6 | 7 |
|
@@ -43,152 +44,217 @@ type ExecuteFlags struct { |
43 | 44 |
|
44 | 45 | // GetDiskMigrations is a function which loads all migrations from disk as defined in config passed as first argument |
45 | 46 | // and using loader created by a function passed as second argument |
46 | | -func GetDiskMigrations(config *config.Config, createLoader func(*config.Config) loader.Loader) []types.Migration { |
| 47 | +func GetDiskMigrations(config *config.Config, createLoader func(*config.Config) loader.Loader) ([]types.Migration, error) { |
47 | 48 | loader := createLoader(config) |
48 | | - diskMigrations := loader.GetDiskMigrations() |
49 | | - return diskMigrations |
| 49 | + return loader.GetDiskMigrations() |
50 | 50 | } |
51 | 51 |
|
52 | 52 | // GetDBTenants is a function which loads all tenants for multi-tenant schemas from DB as defined in config passed as first argument |
53 | 53 | // and using connector created by a function passed as second argument |
54 | | -func GetDBTenants(config *config.Config, createConnector func(*config.Config) db.Connector) []string { |
55 | | - connector := createConnector(config) |
56 | | - connector.Init() |
| 54 | +func GetDBTenants(config *config.Config, newConnector func(*config.Config) (db.Connector, error)) ([]string, error) { |
| 55 | + connector, err := newConnector(config) |
| 56 | + if err != nil { |
| 57 | + return nil, err |
| 58 | + } |
| 59 | + if err := connector.Init(); err != nil { |
| 60 | + return nil, err |
| 61 | + } |
57 | 62 | defer connector.Dispose() |
58 | | - dbTenants := connector.GetTenants() |
59 | | - return dbTenants |
| 63 | + return connector.GetTenants() |
60 | 64 | } |
61 | 65 |
|
62 | 66 | // GetDBMigrations is a function which loads all DB migrations for multi-tenant schemas from DB as defined in config passed as first argument |
63 | 67 | // and using connector created by a function passed as second argument |
64 | | -func GetDBMigrations(config *config.Config, createConnector func(*config.Config) db.Connector) []types.MigrationDB { |
65 | | - connector := createConnector(config) |
66 | | - connector.Init() |
| 68 | +func GetDBMigrations(config *config.Config, newConnector func(*config.Config) (db.Connector, error)) ([]types.MigrationDB, error) { |
| 69 | + connector, err := newConnector(config) |
| 70 | + if err != nil { |
| 71 | + return nil, err |
| 72 | + } |
| 73 | + if err := connector.Init(); err != nil { |
| 74 | + return nil, err |
| 75 | + } |
67 | 76 | defer connector.Dispose() |
68 | | - dbMigrations := connector.GetDBMigrations() |
69 | | - return dbMigrations |
| 77 | + return connector.GetDBMigrations() |
70 | 78 | } |
71 | 79 |
|
72 | 80 | // ApplyMigrations is a function which applies disk migrations to DB as defined in config passed as first argument |
73 | 81 | // and using connector created by a function passed as second argument and disk loader created by a function passed as third argument |
74 | | -func ApplyMigrations(config *config.Config, createConnector func(*config.Config) db.Connector, createLoader func(*config.Config) loader.Loader) []types.Migration { |
75 | | - diskMigrations := GetDiskMigrations(config, createLoader) |
| 82 | +func ApplyMigrations(config *config.Config, newConnector func(*config.Config) (db.Connector, error), createLoader func(*config.Config) loader.Loader) (migrationsToApply []types.Migration, err error) { |
| 83 | + diskMigrations, err := GetDiskMigrations(config, createLoader) |
| 84 | + if err != nil { |
| 85 | + return |
| 86 | + } |
76 | 87 | log.Printf("Read disk migrations: %d", len(diskMigrations)) |
77 | 88 |
|
78 | | - dbMigrations := GetDBMigrations(config, createConnector) |
| 89 | + dbMigrations, err := GetDBMigrations(config, newConnector) |
| 90 | + if err != nil { |
| 91 | + return |
| 92 | + } |
79 | 93 | log.Printf("Read DB migrations: %d", len(dbMigrations)) |
80 | 94 |
|
81 | | - migrationsToApply := migrations.ComputeMigrationsToApply(diskMigrations, dbMigrations) |
| 95 | + migrationsToApply = migrations.ComputeMigrationsToApply(diskMigrations, dbMigrations) |
82 | 96 | log.Printf("Found migrations to apply: %d", len(migrationsToApply)) |
83 | 97 |
|
84 | | - doApplyMigrations(migrationsToApply, config, createConnector) |
85 | | - |
86 | | - notifier := notifications.CreateNotifier(config) |
87 | | - text := fmt.Sprintf("Migrations applied: %d", len(migrationsToApply)) |
88 | | - resp, err := notifier.Notify(text) |
89 | | - |
| 98 | + err = doApplyMigrations(migrationsToApply, config, newConnector) |
90 | 99 | if err != nil { |
91 | | - log.Printf("Notifier err: %v", err) |
92 | | - } else { |
93 | | - log.Printf("Notifier response: %v", resp) |
| 100 | + return |
94 | 101 | } |
95 | 102 |
|
96 | | - return migrationsToApply |
| 103 | + text := fmt.Sprintf("Migrations applied: %d", len(migrationsToApply)) |
| 104 | + sendNotification(config, text) |
| 105 | + |
| 106 | + return |
97 | 107 | } |
98 | 108 |
|
99 | 109 | // AddTenant creates new tenant in DB and applies all tenant migrations |
100 | | -func AddTenant(tenant string, config *config.Config, createConnector func(*config.Config) db.Connector, createLoader func(*config.Config) loader.Loader) []types.Migration { |
| 110 | +func AddTenant(tenant string, config *config.Config, newConnector func(*config.Config) (db.Connector, error), createLoader func(*config.Config) loader.Loader) (migrationsToApply []types.Migration, err error) { |
101 | 111 |
|
102 | | - diskMigrations := GetDiskMigrations(config, createLoader) |
| 112 | + diskMigrations, err := GetDiskMigrations(config, createLoader) |
| 113 | + if err != nil { |
| 114 | + return |
| 115 | + } |
103 | 116 | log.Printf("Read disk migrations: %d", len(diskMigrations)) |
104 | 117 |
|
105 | 118 | // filter only tenant schemas |
106 | | - // var migrationsToApply []types.Migration |
107 | | - migrationsToApply := migrations.FilterTenantMigrations(diskMigrations) |
| 119 | + migrationsToApply = migrations.FilterTenantMigrations(diskMigrations) |
108 | 120 | log.Printf("Found migrations to apply: %d", len(migrationsToApply)) |
109 | 121 |
|
110 | | - doAddTenantAndApplyMigrations(tenant, migrationsToApply, config, createConnector) |
111 | | - |
112 | | - notifier := notifications.CreateNotifier(config) |
113 | | - text := fmt.Sprintf("Tenant %q added, migrations applied: %d", tenant, len(migrationsToApply)) |
114 | | - resp, err := notifier.Notify(text) |
115 | | - |
| 122 | + err = doAddTenantAndApplyMigrations(tenant, migrationsToApply, config, newConnector) |
116 | 123 | if err != nil { |
117 | | - log.Printf("Notifier err: %v", err) |
118 | | - } else { |
119 | | - log.Printf("Notifier response: %v", resp) |
| 124 | + return |
120 | 125 | } |
121 | 126 |
|
122 | | - return diskMigrations |
| 127 | + text := fmt.Sprintf("Tenant %q added, migrations applied: %d", tenant, len(migrationsToApply)) |
| 128 | + sendNotification(config, text) |
| 129 | + |
| 130 | + return |
123 | 131 | } |
124 | 132 |
|
125 | 133 | // VerifyMigrations loads disk and db migrations and verifies their checksums |
126 | 134 | // see migrations.VerifyCheckSums for more information |
127 | | -func VerifyMigrations(config *config.Config, createConnector func(*config.Config) db.Connector, createLoader func(*config.Config) loader.Loader) (bool, []types.Migration) { |
128 | | - diskMigrations := GetDiskMigrations(config, createLoader) |
129 | | - dbMigrations := GetDBMigrations(config, createConnector) |
130 | | - return migrations.VerifyCheckSums(diskMigrations, dbMigrations) |
| 135 | +func VerifyMigrations(config *config.Config, newConnector func(*config.Config) (db.Connector, error), createLoader func(*config.Config) loader.Loader) (bool, []types.Migration, error) { |
| 136 | + diskMigrations, err := GetDiskMigrations(config, createLoader) |
| 137 | + if err != nil { |
| 138 | + return false, []types.Migration{}, err |
| 139 | + } |
| 140 | + |
| 141 | + dbMigrations, err := GetDBMigrations(config, newConnector) |
| 142 | + if err != nil { |
| 143 | + return false, []types.Migration{}, err |
| 144 | + } |
| 145 | + verified, offendingMigrations := migrations.VerifyCheckSums(diskMigrations, dbMigrations) |
| 146 | + return verified, offendingMigrations, nil |
131 | 147 | } |
132 | 148 |
|
133 | 149 | // ExecuteMigrator is a function which executes actions on resources defined in config passed as first argument action defined as second argument |
134 | 150 | // and using connector created by a function passed as third argument and disk loader created by a function passed as fourth argument |
135 | 151 | func ExecuteMigrator(config *config.Config, executeFlags ExecuteFlags) { |
136 | | - doExecuteMigrator(config, executeFlags, db.CreateConnector, loader.CreateLoader) |
| 152 | + err := doExecuteMigrator(config, executeFlags, db.NewConnector, loader.NewLoader) |
| 153 | + if err != nil { |
| 154 | + log.Printf("Error encountered: %v", err) |
| 155 | + } |
137 | 156 | } |
138 | 157 |
|
139 | | -func doExecuteMigrator(config *config.Config, executeFlags ExecuteFlags, createConnector func(*config.Config) db.Connector, createLoader func(*config.Config) loader.Loader) { |
| 158 | +func doExecuteMigrator(config *config.Config, executeFlags ExecuteFlags, newConnector func(*config.Config) (db.Connector, error), createLoader func(*config.Config) loader.Loader) error { |
140 | 159 | switch executeFlags.Action { |
141 | 160 | case PrintConfigAction: |
142 | 161 | log.Printf("Configuration file ==>\n%v\n", config) |
143 | 162 | case GetDiskMigrationsAction: |
144 | | - diskMigrations := GetDiskMigrations(config, createLoader) |
| 163 | + diskMigrations, err := GetDiskMigrations(config, createLoader) |
| 164 | + if err != nil { |
| 165 | + return err |
| 166 | + } |
145 | 167 | if len(diskMigrations) > 0 { |
146 | 168 | log.Printf("List of disk migrations\n%v", utils.MigrationArrayToString(diskMigrations)) |
147 | 169 | } |
148 | 170 | case GetDBMigrationsAction: |
149 | | - dbMigrations := GetDBMigrations(config, createConnector) |
| 171 | + dbMigrations, err := GetDBMigrations(config, newConnector) |
| 172 | + if err != nil { |
| 173 | + return err |
| 174 | + } |
150 | 175 | log.Printf("Read DB migrations: %d", len(dbMigrations)) |
151 | 176 | if len(dbMigrations) > 0 { |
152 | 177 | log.Printf("List of db migrations\n%v", utils.MigrationDBArrayToString(dbMigrations)) |
153 | 178 | } |
154 | 179 | case AddTenantAction: |
155 | | - verified, offendingMigrations := VerifyMigrations(config, createConnector, createLoader) |
| 180 | + verified, offendingMigrations, err := VerifyMigrations(config, newConnector, createLoader) |
| 181 | + if err != nil { |
| 182 | + return err |
| 183 | + } |
156 | 184 | if !verified { |
157 | 185 | log.Printf("Checksum verification failed.") |
158 | 186 | log.Printf("List of offending disk migrations\n%v", utils.MigrationArrayToString(offendingMigrations)) |
159 | | - } else { |
160 | | - AddTenant(executeFlags.Tenant, config, createConnector, createLoader) |
| 187 | + return errors.New("Checksum verification failed") |
| 188 | + } |
| 189 | + |
| 190 | + migrationsApplied, err := AddTenant(executeFlags.Tenant, config, newConnector, createLoader) |
| 191 | + if err != nil { |
| 192 | + return err |
| 193 | + } |
| 194 | + if len(migrationsApplied) > 0 { |
| 195 | + log.Printf("List of migrations applied\n%v", utils.MigrationArrayToString(migrationsApplied)) |
161 | 196 | } |
162 | 197 | case GetDBTenantsAction: |
163 | | - dbTenants := GetDBTenants(config, createConnector) |
| 198 | + dbTenants, err := GetDBTenants(config, newConnector) |
| 199 | + if err != nil { |
| 200 | + return err |
| 201 | + } |
164 | 202 | log.Printf("Read DB tenants: %d", len(dbTenants)) |
165 | 203 | if len(dbTenants) > 0 { |
166 | 204 | log.Printf("List of db tenants\n%v", utils.TenantArrayToString(dbTenants)) |
167 | 205 | } |
168 | 206 | case ApplyAction: |
169 | | - verified, offendingMigrations := VerifyMigrations(config, createConnector, createLoader) |
| 207 | + verified, offendingMigrations, err := VerifyMigrations(config, newConnector, createLoader) |
| 208 | + if err != nil { |
| 209 | + return err |
| 210 | + } |
170 | 211 | if !verified { |
171 | 212 | log.Printf("Checksum verification failed.") |
172 | 213 | log.Printf("List of offending disk migrations\n%v", utils.MigrationArrayToString(offendingMigrations)) |
173 | | - } else { |
174 | | - migrationsApplied := ApplyMigrations(config, createConnector, createLoader) |
175 | | - if len(migrationsApplied) > 0 { |
176 | | - log.Printf("List of migrations applied\n%v", utils.MigrationArrayToString(migrationsApplied)) |
177 | | - } |
| 214 | + return errors.New("Checksum verification failed") |
| 215 | + } |
| 216 | + migrationsApplied, err := ApplyMigrations(config, newConnector, createLoader) |
| 217 | + if err != nil { |
| 218 | + return err |
| 219 | + } |
| 220 | + if len(migrationsApplied) > 0 { |
| 221 | + log.Printf("List of migrations applied\n%v", utils.MigrationArrayToString(migrationsApplied)) |
178 | 222 | } |
179 | 223 | } |
| 224 | + return nil |
180 | 225 | } |
181 | 226 |
|
182 | | -func doApplyMigrations(migrationsToApply []types.Migration, config *config.Config, createConnector func(*config.Config) db.Connector) { |
183 | | - connector := createConnector(config) |
184 | | - connector.Init() |
| 227 | +func doApplyMigrations(migrationsToApply []types.Migration, config *config.Config, newConnector func(*config.Config) (db.Connector, error)) error { |
| 228 | + connector, err := newConnector(config) |
| 229 | + if err != nil { |
| 230 | + return err |
| 231 | + } |
| 232 | + if err := connector.Init(); err != nil { |
| 233 | + return err |
| 234 | + } |
185 | 235 | defer connector.Dispose() |
186 | | - connector.ApplyMigrations(migrationsToApply) |
| 236 | + return connector.ApplyMigrations(migrationsToApply) |
187 | 237 | } |
188 | 238 |
|
189 | | -func doAddTenantAndApplyMigrations(tenant string, migrationsToApply []types.Migration, config *config.Config, createConnector func(*config.Config) db.Connector) { |
190 | | - connector := createConnector(config) |
191 | | - connector.Init() |
| 239 | +func doAddTenantAndApplyMigrations(tenant string, migrationsToApply []types.Migration, config *config.Config, newConnector func(*config.Config) (db.Connector, error)) error { |
| 240 | + connector, err := newConnector(config) |
| 241 | + if err != nil { |
| 242 | + return err |
| 243 | + } |
| 244 | + if err := connector.Init(); err != nil { |
| 245 | + return err |
| 246 | + } |
192 | 247 | defer connector.Dispose() |
193 | | - connector.AddTenantAndApplyMigrations(tenant, migrationsToApply) |
| 248 | + return connector.AddTenantAndApplyMigrations(tenant, migrationsToApply) |
| 249 | +} |
| 250 | + |
| 251 | +func sendNotification(config *config.Config, text string) { |
| 252 | + notifier := notifications.CreateNotifier(config) |
| 253 | + resp, err := notifier.Notify(text) |
| 254 | + |
| 255 | + if err != nil { |
| 256 | + log.Printf("Notifier err: %v", err) |
| 257 | + } else { |
| 258 | + log.Printf("Notifier response: %v", resp) |
| 259 | + } |
194 | 260 | } |
0 commit comments