Skip to content

Commit 8ef8cd2

Browse files
authored
Merge pull request #286 from lukaszbudnik/loader-health-checks
Loader health checks
2 parents 22e908b + eb737f4 commit 8ef8cd2

File tree

11 files changed

+240
-7
lines changed

11 files changed

+240
-7
lines changed

README.md

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,12 @@ curl http://localhost:8080/
3737

3838
Sample HTTP response:
3939

40-
```
41-
{"release":"v2020.1.3","commitSha":"b56a2694fcdb523e0c3f3e79b2d7a1b61f28a91f","commitDate":"2020-10-12T13:22:57+02:00","apiVersions":["v1","v2"]}
40+
```json
41+
{
42+
"release": "refs/tags/v2021.1.0",
43+
"sha": "3ede93745e459e1214513b21ef76d94d09d10ae7",
44+
"apiVersions": ["v2"]
45+
}
4246
```
4347

4448
## /v2 - GraphQL API
@@ -653,6 +657,52 @@ The following metrics are available:
653657
- `migrator_gin_migrations_applied{type="tenant_migrations_total"}` - migrator total tenant migrations applied (for all tenants)
654658
- `migrator_gin_migrations_applied{type="tenant_scripts_total"}` - migrator total tenant scripts applied (for all tenants)
655659

660+
# Health Checks
661+
662+
Health checks are available at `/health` endpoint. migrator implements [Eclipse MicroProfile Health 3.0 RC4](https://download.eclipse.org/microprofile/microprofile-health-3.0-RC4/microprofile-health-spec.html) spec.
663+
664+
A successful response returns HTTP 200 OK code:
665+
666+
```json
667+
{
668+
"status": "UP",
669+
"checks": [
670+
{
671+
"name": "DB",
672+
"status": "UP"
673+
},
674+
{
675+
"name": "Loader",
676+
"status": "UP"
677+
}
678+
]
679+
}
680+
```
681+
682+
In case one of the checks has DOWN status then the overall status is DOWN. Failed check has `data` field which provides more information on why its status is DOWN. Health check will also return HTTP 503 Service Unavailable code:
683+
684+
```json
685+
{
686+
"status": "DOWN",
687+
"checks": [
688+
{
689+
"name": "DB",
690+
"status": "DOWN",
691+
"data": {
692+
"details": "failed to connect to database: dial tcp 127.0.0.1:5432: connect: connection refused"
693+
}
694+
},
695+
{
696+
"name": "Loader",
697+
"status": "DOWN",
698+
"data": {
699+
"details": "open /nosuchdir/migrations: no such file or directory"
700+
}
701+
}
702+
]
703+
}
704+
```
705+
656706
# Tutorials
657707

658708
In this section I provide links to more in-depth migrator tutorials.

coordinator/coordinator.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,15 @@ func (c *coordinator) HealthCheck() types.HealthResponse {
181181
response.Status = types.HealthStatusDown
182182
}
183183

184+
// Loader check
185+
err = c.loader.HealthCheck()
186+
if err == nil {
187+
checks = append(checks, types.HealthChecks{Name: "Loader", Status: types.HealthStatusUp})
188+
} else {
189+
checks = append(checks, types.HealthChecks{Name: "Loader", Status: types.HealthStatusDown, Data: &types.HealthData{Details: err.Error()}})
190+
response.Status = types.HealthStatusDown
191+
}
192+
184193
response.Checks = checks
185194

186195
return response

coordinator/coordinator_mocks.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,26 @@ func (m *mockedDiskLoader) GetSourceMigrations() []types.Migration {
3333
return []types.Migration{m1, m2, m3, m4, m5}
3434
}
3535

36+
func (m *mockedDiskLoader) HealthCheck() error {
37+
return nil
38+
}
39+
3640
func newMockedDiskLoader(_ context.Context, _ *config.Config) loader.Loader {
3741
return &mockedDiskLoader{}
3842
}
3943

44+
type mockedDiskLoaderHealthCheckError struct {
45+
mockedDiskLoader
46+
}
47+
48+
func (m *mockedDiskLoaderHealthCheckError) HealthCheck() error {
49+
return errors.New("trouble maker")
50+
}
51+
52+
func newMockedDiskLoaderHealthCheckError(_ context.Context, _ *config.Config) loader.Loader {
53+
return &mockedDiskLoaderHealthCheckError{}
54+
}
55+
4056
type mockedNotifier struct {
4157
returnError bool
4258
}
@@ -64,6 +80,10 @@ func (m *mockedBrokenCheckSumDiskLoader) GetSourceMigrations() []types.Migration
6480
return []types.Migration{m1}
6581
}
6682

83+
func (m *mockedBrokenCheckSumDiskLoader) HealthCheck() error {
84+
return nil
85+
}
86+
6787
func newBrokenCheckSumMockedDiskLoader(_ context.Context, _ *config.Config) loader.Loader {
6888
return new(mockedBrokenCheckSumDiskLoader)
6989
}
@@ -77,6 +97,10 @@ func (m *mockedDifferentScriptCheckSumMockedDiskLoader) GetSourceMigrations() []
7797
return []types.Migration{m1, m2}
7898
}
7999

100+
func (m *mockedDifferentScriptCheckSumMockedDiskLoader) HealthCheck() error {
101+
return nil
102+
}
103+
80104
func newDifferentScriptCheckSumMockedDiskLoader(_ context.Context, _ *config.Config) loader.Loader {
81105
return new(mockedDifferentScriptCheckSumMockedDiskLoader)
82106
}

coordinator/coordinator_test.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,13 +367,15 @@ func TestCreateTenant(t *testing.T) {
367367
assert.NotNil(t, results.Version)
368368
}
369369

370-
func TestHealthCheckDBOK(t *testing.T) {
370+
func TestHealthCheckDBAndLoaderOK(t *testing.T) {
371371
coordinator := New(context.TODO(), nil, newNoopMetrics(), newMockedConnector, newMockedDiskLoader, newErrorMockedNotifier)
372372
defer coordinator.Dispose()
373373
healthResponse := coordinator.HealthCheck()
374374
assert.Equal(t, types.HealthStatusUp, healthResponse.Status)
375375
assert.Equal(t, "DB", healthResponse.Checks[0].Name)
376376
assert.Equal(t, types.HealthStatusUp, healthResponse.Checks[0].Status)
377+
assert.Equal(t, "Loader", healthResponse.Checks[1].Name)
378+
assert.Equal(t, types.HealthStatusUp, healthResponse.Checks[1].Status)
377379
}
378380

379381
func TestHealthCheckDBKO(t *testing.T) {
@@ -383,4 +385,17 @@ func TestHealthCheckDBKO(t *testing.T) {
383385
assert.Equal(t, types.HealthStatusDown, healthResponse.Status)
384386
assert.Equal(t, "DB", healthResponse.Checks[0].Name)
385387
assert.Equal(t, types.HealthStatusDown, healthResponse.Checks[0].Status)
388+
assert.Equal(t, "Loader", healthResponse.Checks[1].Name)
389+
assert.Equal(t, types.HealthStatusUp, healthResponse.Checks[1].Status)
390+
}
391+
392+
func TestHealthCheckLoaderKO(t *testing.T) {
393+
coordinator := New(context.TODO(), nil, newNoopMetrics(), newMockedConnector, newMockedDiskLoaderHealthCheckError, newErrorMockedNotifier)
394+
defer coordinator.Dispose()
395+
healthResponse := coordinator.HealthCheck()
396+
assert.Equal(t, types.HealthStatusDown, healthResponse.Status)
397+
assert.Equal(t, "DB", healthResponse.Checks[0].Name)
398+
assert.Equal(t, types.HealthStatusUp, healthResponse.Checks[0].Status)
399+
assert.Equal(t, "Loader", healthResponse.Checks[1].Name)
400+
assert.Equal(t, types.HealthStatusDown, healthResponse.Checks[1].Status)
386401
}

loader/azureblob_loader.go

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,12 +163,38 @@ func (abl *azureBlobLoader) getAzureStorageCredentials() (azblob.Credential, err
163163
return nil, err
164164
}
165165

166-
err = azureServicePrincipalToken.Refresh()
167-
if err != nil {
168-
return nil, err
169-
}
170166
token := azureServicePrincipalToken.Token()
171167

172168
credential := azblob.NewTokenCredential(token.AccessToken, nil)
173169
return credential, nil
174170
}
171+
172+
func (abl *azureBlobLoader) HealthCheck() error {
173+
credential, err := abl.getAzureStorageCredentials()
174+
if err != nil {
175+
return err
176+
}
177+
178+
p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
179+
180+
// check if optional prefixes are provided
181+
baseLocation := strings.TrimRight(abl.config.BaseLocation, "/")
182+
indx := common.FindNthIndex(baseLocation, '/', 4)
183+
184+
prefix := ""
185+
if indx > -1 {
186+
prefix = baseLocation[indx+1:]
187+
baseLocation = baseLocation[:indx]
188+
}
189+
190+
u, err := url.Parse(baseLocation)
191+
if err != nil {
192+
return err
193+
}
194+
195+
containerURL := azblob.NewContainerURL(*u, p)
196+
197+
_, err = containerURL.ListBlobsFlatSegment(abl.ctx, azblob.Marker{}, azblob.ListBlobsSegmentOptions{Prefix: prefix, MaxResults: 1})
198+
199+
return err
200+
}

loader/azureblob_loader_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,3 +89,43 @@ func TestAzureGetSourceMigrationsWithOptionalPrefix(t *testing.T) {
8989
assert.Contains(t, migrations[11].File, "prod/artefacts/migrations/tenants-scripts/b.sql")
9090

9191
}
92+
93+
func TestAzureHealthCheck(t *testing.T) {
94+
accountName, accountKey := os.Getenv("AZURE_STORAGE_ACCOUNT"), os.Getenv("AZURE_STORAGE_ACCESS_KEY")
95+
96+
if len(accountName) == 0 || len(accountKey) == 0 {
97+
t.Skip("skipping test AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY not set")
98+
}
99+
100+
baseLocation := fmt.Sprintf("https://%v.blob.core.windows.net/myothercontainer/prod/artefacts/", accountName)
101+
102+
config := &config.Config{
103+
BaseLocation: baseLocation,
104+
SingleMigrations: []string{"migrations/config", "migrations/ref"},
105+
TenantMigrations: []string{"migrations/tenants"},
106+
SingleScripts: []string{"migrations/config-scripts"},
107+
TenantScripts: []string{"migrations/tenants-scripts"},
108+
}
109+
110+
loader := &azureBlobLoader{baseLoader{context.TODO(), config}}
111+
err := loader.HealthCheck()
112+
assert.Nil(t, err)
113+
}
114+
115+
func TestAzureMsiCredentials(t *testing.T) {
116+
// in CI/CD env the MSI credentials are not available
117+
// this code just assures that if no shared key envs are present it will fallback to MSI
118+
// unsetting one of the shared key envs will cause fallback to MSI
119+
os.Unsetenv("AZURE_STORAGE_ACCESS_KEY")
120+
121+
config := &config.Config{
122+
BaseLocation: "https://justtesting.blob.core.windows.net/myothercontainer/prod/artefacts/",
123+
SingleMigrations: []string{"migrations/config", "migrations/ref"},
124+
TenantMigrations: []string{"migrations/tenants"},
125+
SingleScripts: []string{"migrations/config-scripts"},
126+
TenantScripts: []string{"migrations/tenants-scripts"},
127+
}
128+
129+
loader := &azureBlobLoader{baseLoader{context.TODO(), config}}
130+
loader.getAzureStorageCredentials()
131+
}

loader/disk_loader.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,15 @@ func (dl *diskLoader) GetSourceMigrations() []types.Migration {
4646
return migrations
4747
}
4848

49+
func (dl *diskLoader) HealthCheck() error {
50+
absBaseDir, err := filepath.Abs(dl.config.BaseLocation)
51+
if err != nil {
52+
return err
53+
}
54+
_, err = ioutil.ReadDir(absBaseDir)
55+
return err
56+
}
57+
4958
func (dl *diskLoader) getDirs(baseDir string, migrationsDirs []string) []string {
5059
var filteredDirs []string
5160
for _, migrationsDir := range migrationsDirs {

loader/disk_loader_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,12 @@ func TestDiskGetDiskMigrations(t *testing.T) {
8484
assert.Contains(t, migrations[10].File, "test/migrations/tenants-scripts/a.sql")
8585
assert.Contains(t, migrations[11].File, "test/migrations/tenants-scripts/b.sql")
8686
}
87+
88+
func TestDiskHealthCheck(t *testing.T) {
89+
config := &config.Config{
90+
BaseLocation: "/path/to/baseDir",
91+
}
92+
loader := New(context.TODO(), config)
93+
err := loader.HealthCheck()
94+
assert.NotNil(t, err)
95+
}

loader/loader.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
// Loader interface abstracts all loading operations performed by migrator
1414
type Loader interface {
1515
GetSourceMigrations() []types.Migration
16+
HealthCheck() error
1617
}
1718

1819
// Factory is a factory method for creating Loader instance

loader/s3_loader.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,35 @@ func (s3l *s3Loader) GetSourceMigrations() []types.Migration {
2929
return s3l.doGetSourceMigrations(client)
3030
}
3131

32+
func (s3l *s3Loader) HealthCheck() error {
33+
sess, err := session.NewSession()
34+
if err != nil {
35+
return err
36+
}
37+
client := s3.New(sess)
38+
return s3l.doHealthCheck(client)
39+
}
40+
41+
func (s3l *s3Loader) doHealthCheck(client s3iface.S3API) error {
42+
bucketWithPrefixes := strings.Split(strings.Replace(strings.TrimRight(s3l.config.BaseLocation, "/"), "s3://", "", 1), "/")
43+
44+
bucket := bucketWithPrefixes[0]
45+
prefix := "/"
46+
if len(bucketWithPrefixes) > 1 {
47+
prefix = strings.Join(bucketWithPrefixes[1:], "/")
48+
}
49+
50+
input := &s3.ListObjectsV2Input{
51+
Bucket: aws.String(bucket),
52+
Prefix: aws.String(prefix),
53+
MaxKeys: aws.Int64(1),
54+
}
55+
56+
_, err := client.ListObjectsV2(input)
57+
58+
return err
59+
}
60+
3261
func (s3l *s3Loader) doGetSourceMigrations(client s3iface.S3API) []types.Migration {
3362
migrations := []types.Migration{}
3463

0 commit comments

Comments
 (0)