Skip to content
This repository was archived by the owner on Apr 4, 2021. It is now read-only.

Commit 830ab36

Browse files
committed
FALCON-1566 Add test for SLA monitoring API
Author: Pragya <[email protected]> Reviewers: Paul Isaychuk <[email protected]>, Deepak Kumar Barr <[email protected]> Closes #44 from pragya-mittal/feed-sla and squashes the following commits: c6c6f65 [Pragya] Review comments addressed 251d404 [Pragya] Resolved merge conflicts f5b2888 [Pragya] FALCON-1566 Add test for SLA monitoring API f037385 [Pragya] Merge branch 'master' of https://github.com/apache/falcon 4c19ec0 [Pragya] Merge branch 'master' of https://github.com/apache/falcon 3b7fd63 [Pragya] FALCON-1829 Add regression for submit and schedule process on native scheduler (time based)
1 parent 5440276 commit 830ab36

File tree

6 files changed

+293
-4
lines changed

6 files changed

+293
-4
lines changed

falcon-regression/CHANGES.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ Trunk (Unreleased)
55
INCOMPATIBLE CHANGES
66

77
NEW FEATURES
8+
FALCON-1566 Add test for SLA monitoring API (Pragya Mittal)
9+
810
FALCON-1567 Test case for Lifecycle feature (Pragya Mittal)
911

1012
FALCON-1784 Add regression test for for FALCON-1647 (Paul Isaychuk)

falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -701,7 +701,7 @@ public ServiceResponse getEntityLineage(String params)
701701
if (StringUtils.isNotEmpty(params)){
702702
url += colo.isEmpty() ? "?" + params : "&" + params;
703703
}
704-
return Util.sendRequestLineage(createUrl(url), "get", null, null);
704+
return Util.sendJSONRequest(createUrl(url), "get", null, null);
705705
}
706706

707707
/**
@@ -715,4 +715,19 @@ public InstanceDependencyResult getInstanceDependencies(
715715
.createAndSendRequestProcessInstance(url, params, allColo, user);
716716
}
717717

718+
/**
719+
* Retrieves sla alerts.
720+
* @param params list of optional parameters
721+
* @return instances with sla missed.
722+
*/
723+
public ServiceResponse getSlaAlert(String params)
724+
throws URISyntaxException, AuthenticationException, InterruptedException, IOException {
725+
String url = createUrl(this.hostname + URLS.SLA.getValue(),
726+
getEntityType());
727+
if (StringUtils.isNotEmpty(params)) {
728+
url += params;
729+
}
730+
return Util.sendJSONRequest(createUrl(url), "get", null, null);
731+
}
732+
718733
}

falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.falcon.resource.EntityList;
2626
import org.apache.falcon.resource.EntitySummaryResult;
2727
import org.apache.falcon.resource.LineageGraphResult;
28+
import org.apache.falcon.resource.SchedulableEntityInstanceResult;
2829
import org.apache.http.HttpResponse;
2930
import org.apache.log4j.Logger;
3031

@@ -121,4 +122,13 @@ public LineageGraphResult getLineageGraphResult() {
121122
return lineageGraphResult;
122123
}
123124

125+
/**
126+
* Retrieves SchedulableEntityInstanceResult from a message if possible.
127+
* @return SchedulableEntityInstanceResult
128+
*/
129+
public SchedulableEntityInstanceResult getSlaResult() {
130+
SchedulableEntityInstanceResult schedulableEntityInstanceResult = new GsonBuilder().create().fromJson(message,
131+
SchedulableEntityInstanceResult.class);
132+
return schedulableEntityInstanceResult;
133+
}
124134
}

falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,7 @@ public enum URLS {
385385
STATUS_URL("/api/entities/status"),
386386
ENTITY_SUMMARY("/api/entities/summary"),
387387
SUBMIT_AND_SCHEDULE_URL("/api/entities/submitAndSchedule"),
388+
SLA("/api/entities/sla-alert"),
388389
ENTITY_LINEAGE("/api/metadata/lineage/entities"),
389390
INSTANCE_RUNNING("/api/instance/running"),
390391
INSTANCE_STATUS("/api/instance/status"),
@@ -595,7 +596,7 @@ public static Document convertStringToDocument(String xmlStr) {
595596
* @throws URISyntaxException
596597
* @throws AuthenticationException
597598
*/
598-
public static ServiceResponse sendRequestLineage(String url, String method, String data, String user)
599+
public static ServiceResponse sendJSONRequest(String url, String method, String data, String user)
599600
throws IOException, URISyntaxException, AuthenticationException, InterruptedException {
600601
BaseRequest request = new BaseRequest(url, method, user, data);
601602
request.addHeader(RequestKeys.CONTENT_TYPE_HEADER, RequestKeys.JSON_CONTENT_TYPE);
Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.falcon.regression.SLA;
20+
21+
import org.apache.falcon.entity.v0.EntityType;
22+
import org.apache.falcon.entity.v0.Frequency;
23+
import org.apache.falcon.regression.Entities.FeedMerlin;
24+
import org.apache.falcon.regression.core.bundle.Bundle;
25+
import org.apache.falcon.regression.core.helpers.ColoHelper;
26+
import org.apache.falcon.regression.core.response.ServiceResponse;
27+
import org.apache.falcon.regression.core.util.HadoopUtil;
28+
import org.apache.falcon.regression.core.util.InstanceUtil;
29+
import org.apache.falcon.regression.core.util.TimeUtil;
30+
import org.apache.falcon.regression.core.util.AssertUtil;
31+
import org.apache.falcon.regression.core.util.BundleUtil;
32+
import org.apache.falcon.regression.testHelper.BaseTestClass;
33+
import org.apache.falcon.resource.SchedulableEntityInstance;
34+
import org.apache.falcon.resource.SchedulableEntityInstanceResult;
35+
import org.apache.hadoop.fs.FileSystem;
36+
import org.apache.log4j.Logger;
37+
import org.joda.time.DateTime;
38+
import org.testng.Assert;
39+
import org.testng.annotations.AfterMethod;
40+
import org.testng.annotations.BeforeMethod;
41+
import org.testng.annotations.Test;
42+
43+
import java.io.IOException;
44+
import java.util.List;
45+
import java.util.Arrays;
46+
import java.util.Comparator;
47+
import java.util.HashMap;
48+
import java.util.Map;
49+
import java.util.ArrayList;
50+
import java.util.Collections;
51+
52+
53+
/**
54+
* Feed SLA monitoring tests.
55+
* Test assumes following properties are set in startup.properties of server :
56+
* *.feed.sla.statusCheck.frequency.seconds=60
57+
* *.feed.sla.lookAheadWindow.millis=60000
58+
*/
59+
@Test(groups = { "distributed", "embedded" })
60+
public class FeedSLAMonitoringTest extends BaseTestClass {
61+
62+
private ColoHelper cluster = servers.get(0);
63+
private FileSystem clusterFS = serverFS.get(0);
64+
private String baseTestHDFSDir = cleanAndGetTestDir();
65+
private String feedInputPath = baseTestHDFSDir + "/input" + MINUTE_DATE_PATTERN;
66+
private List<String> slaFeedNames;
67+
private List<Frequency> slaFeedFrequencies;
68+
private String clusterName;
69+
private static final Logger LOGGER = Logger.getLogger(FeedSLAMonitoringTest.class);
70+
71+
private String startTime;
72+
private String endTime;
73+
private String slaStartTime;
74+
private String slaEndTime;
75+
private int noOfFeeds;
76+
private int statusCheckFrequency;
77+
78+
private static final Comparator<SchedulableEntityInstance> DEPENDENCY_COMPARATOR =
79+
new Comparator<SchedulableEntityInstance>() {
80+
@Override
81+
public int compare(SchedulableEntityInstance o1, SchedulableEntityInstance o2) {
82+
return o1.compareTo(o2);
83+
}
84+
};
85+
86+
/**
87+
* Submitting 3 feeds with different frequencies and sla values.
88+
* @throws Exception
89+
*/
90+
@BeforeMethod(alwaysRun = true)
91+
public void setup() throws Exception {
92+
93+
bundles[0] = BundleUtil.readELBundle();
94+
bundles[0] = new Bundle(bundles[0], cluster);
95+
bundles[0].generateUniqueBundle(this);
96+
bundles[0].setInputFeedDataPath(feedInputPath);
97+
clusterName = bundles[0].getClusterNames().get(0);
98+
ServiceResponse response =
99+
prism.getClusterHelper().submitEntity(bundles[0].getClusters().get(0));
100+
AssertUtil.assertSucceeded(response);
101+
102+
startTime = TimeUtil.getTimeWrtSystemTime(-10);
103+
endTime = TimeUtil.addMinsToTime(startTime, 20);
104+
noOfFeeds=3;
105+
106+
LOGGER.info("Time range between : " + startTime + " and " + endTime);
107+
final String oldFeedName = bundles[0].getInputFeedNameFromBundle();
108+
slaFeedFrequencies = Arrays.asList(new Frequency("1", Frequency.TimeUnit.minutes),
109+
new Frequency("2", Frequency.TimeUnit.minutes),
110+
new Frequency("4", Frequency.TimeUnit.minutes));
111+
112+
slaFeedNames = Arrays.asList(oldFeedName + "-1", oldFeedName + "-2", oldFeedName + "-3");
113+
114+
//Submit 3 feeds with different frequencies and sla values.
115+
for (int bIndex = 0; bIndex < noOfFeeds; ++bIndex) {
116+
final FeedMerlin ipFeed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
117+
118+
ipFeed.setValidity(startTime, endTime);
119+
ipFeed.setAvailabilityFlag("_SUCCESS");
120+
121+
//set slaLow and slaHigh
122+
ipFeed.setSla(new Frequency("1", Frequency.TimeUnit.minutes),
123+
new Frequency("2", Frequency.TimeUnit.minutes));
124+
ipFeed.setName(slaFeedNames.get(bIndex));
125+
ipFeed.setFrequency(slaFeedFrequencies.get(bIndex));
126+
127+
LOGGER.info("Feed is : " + ipFeed.toString());
128+
129+
AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(ipFeed.toString()));
130+
}
131+
}
132+
133+
@AfterMethod(alwaysRun = true)
134+
public void tearDown() throws IOException {
135+
cleanTestsDirs();
136+
removeTestClassEntities();
137+
}
138+
139+
/**
140+
* The following test submits 3 feeds, checks the slaAlert for a given time range and validates its output.
141+
* It also checks the sla status when feed is deleted , data created with/without _SUCCESS folder.
142+
* @throws Exception
143+
*/
144+
@Test
145+
public void feedSLATest() throws Exception {
146+
/**TEST : Check sla response for a given time range
147+
*/
148+
149+
statusCheckFrequency=60; // 60 seconds
150+
151+
// Map of instanceDate and corresponding list of SchedulableEntityInstance
152+
Map<String, List<SchedulableEntityInstance>> instanceEntityMap = new HashMap<>();
153+
154+
slaStartTime = startTime;
155+
slaEndTime = TimeUtil.addMinsToTime(slaStartTime, 10);
156+
DateTime slaStartDate = TimeUtil.oozieDateToDate(slaStartTime);
157+
DateTime slaEndDate = TimeUtil.oozieDateToDate(slaEndTime);
158+
159+
List<SchedulableEntityInstance> expectedInstances = new ArrayList<>();
160+
SchedulableEntityInstance expectedSchedulableEntityInstance;
161+
162+
for (int index = 0; index < noOfFeeds; ++index) {
163+
164+
DateTime dt = new DateTime(slaStartDate);
165+
while (!dt.isAfter(slaEndDate)) {
166+
167+
expectedSchedulableEntityInstance = new SchedulableEntityInstance(slaFeedNames.get(index),
168+
clusterName, dt.toDate(), EntityType.FEED);
169+
expectedSchedulableEntityInstance.setTags("Missed SLA High");
170+
expectedInstances.add(expectedSchedulableEntityInstance);
171+
172+
if (!instanceEntityMap.containsKey(dt.toString())) {
173+
instanceEntityMap.put(dt.toString(), new ArrayList<SchedulableEntityInstance>());
174+
}
175+
instanceEntityMap.get(dt.toString()).add(expectedSchedulableEntityInstance);
176+
dt = dt.plusMinutes(slaFeedFrequencies.get(index).getFrequencyAsInt());
177+
178+
}
179+
}
180+
181+
TimeUtil.sleepSeconds(statusCheckFrequency);
182+
183+
SchedulableEntityInstanceResult response = prism.getFeedHelper().getSlaAlert(
184+
"?start=" + slaStartTime + "&end=" + slaEndTime).getSlaResult();
185+
186+
LOGGER.info(response.getMessage());
187+
188+
validateInstances(response, expectedInstances);
189+
190+
/**TEST : Create missing dependencies with _SUCCESS directory and check sla response
191+
*/
192+
193+
String dateEntry = (String) instanceEntityMap.keySet().toArray()[1];
194+
LOGGER.info(dateEntry + "/" + instanceEntityMap.get(dateEntry));
195+
List<String> dataDates = InstanceUtil.getMinuteDatesToPath(dateEntry, dateEntry, 0);
196+
197+
HadoopUtil.createFolders(clusterFS, baseTestHDFSDir + "/input/", dataDates);
198+
199+
//sla response for feeds when _SUCCESS file is missing from dataPath
200+
response = prism.getFeedHelper().getSlaAlert("?start=" + slaStartTime + "&end=" + slaEndTime).getSlaResult();
201+
202+
// Response does not change as it checks for _SUCCESS file
203+
validateInstances(response, expectedInstances);
204+
205+
//Create _SUCCESS file
206+
HadoopUtil.recreateDir(clusterFS, baseTestHDFSDir + "/input/" + dataDates.get(0) + "/_SUCCESS");
207+
for (SchedulableEntityInstance instance : instanceEntityMap.get(dateEntry)) {
208+
expectedInstances.remove(instance);
209+
}
210+
instanceEntityMap.remove(dateEntry);
211+
212+
TimeUtil.sleepSeconds(statusCheckFrequency);
213+
214+
//sla response for feeds when _SUCCESS file is available in dataPath
215+
response = prism.getFeedHelper().getSlaAlert("?start=" + slaStartTime + "&end=" + slaEndTime).getSlaResult();
216+
validateInstances(response, expectedInstances);
217+
218+
/** TEST : Delete feed and check sla response
219+
*/
220+
String deletedFeed = slaFeedNames.get(0);
221+
prism.getFeedHelper().deleteByName(deletedFeed, null);
222+
223+
for (Map.Entry<String, List<SchedulableEntityInstance>> entry : instanceEntityMap.entrySet())
224+
{
225+
LOGGER.info(entry.getKey() + "/" + entry.getValue());
226+
for (SchedulableEntityInstance instance : entry.getValue()) {
227+
if (instance.getEntityName().equals(deletedFeed)) {
228+
expectedInstances.remove(instance);
229+
}
230+
}
231+
232+
}
233+
TimeUtil.sleepSeconds(statusCheckFrequency);
234+
response = prism.getFeedHelper().getSlaAlert("?start=" + slaStartTime + "&end=" + slaEndTime).getSlaResult();
235+
validateInstances(response, expectedInstances);
236+
237+
}
238+
239+
/**
240+
* Validating expected response with actual response.
241+
* @param response SchedulableEntityInstanceResult response
242+
* @param expectedInstances List of expected instances
243+
*/
244+
private static void validateInstances(SchedulableEntityInstanceResult response,
245+
List<SchedulableEntityInstance> expectedInstances) {
246+
247+
List<SchedulableEntityInstance> actualInstances = Arrays.asList(response.getInstances());
248+
249+
for (SchedulableEntityInstance instance : actualInstances) {
250+
instance.setTags("Missed SLA High");
251+
}
252+
253+
Collections.sort(expectedInstances, DEPENDENCY_COMPARATOR);
254+
Collections.sort(actualInstances, DEPENDENCY_COMPARATOR);
255+
256+
Assert.assertEquals(actualInstances, expectedInstances, "Instances mismatch for");
257+
}
258+
}

falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@
2121
import org.apache.falcon.regression.core.bundle.Bundle;
2222
import org.apache.falcon.regression.core.helpers.ColoHelper;
2323
import org.apache.falcon.regression.core.response.ServiceResponse;
24-
import org.apache.falcon.regression.core.util.*;
24+
import org.apache.falcon.regression.core.util.AssertUtil;
25+
import org.apache.falcon.regression.core.util.BundleUtil;
26+
import org.apache.falcon.regression.core.util.OSUtil;
27+
import org.apache.falcon.regression.core.util.TimeUtil;
28+
import org.apache.falcon.regression.core.util.Util;
2529
import org.apache.falcon.regression.testHelper.BaseTestClass;
2630
import org.apache.log4j.Logger;
27-
import org.apache.oozie.client.OozieClient;
2831
import org.testng.annotations.AfterMethod;
2932
import org.testng.annotations.BeforeClass;
3033
import org.testng.annotations.BeforeMethod;

0 commit comments

Comments
 (0)