Skip to content

Commit bab98b6

Browse files
authored
[Improve][Engine] Improve flink engine (#510)
1 parent fdb9d32 commit bab98b6

File tree

7 files changed

+152
-50
lines changed

7 files changed

+152
-50
lines changed

datavines-common/src/main/java/io/datavines/common/CommonConstants.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,4 +300,20 @@ public class CommonConstants {
300300
public static final String CATALOG_ENTITY_INSTANCE_STATUS_ACTIVE = "active";
301301

302302
public static final String CATALOG_ENTITY_INSTANCE_STATUS_DELETED = "deleted";
303+
304+
public static final String WEEK_START_DAY = "week_start_day";
305+
306+
public static final String WEEK_END_DAY = "week_end_day";
307+
308+
public static final String MONTH_START_DAY = "month_start_day";
309+
310+
public static final String MONTH_END_DAY = "month_end_day";
311+
312+
public static final String DAY_START_TIME = "day_start_time";
313+
314+
public static final String DAY_END_TIME = "day_end_time";
315+
316+
public static final String DAY_AFTER_7_END_TIME = "day_after_7_end_time";
317+
318+
public static final String DAY_AFTER_30_END_TIME = "day_after_30_end_time";
303319
}

datavines-common/src/main/java/io/datavines/common/utils/CommonPropertyUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,7 @@ public static void setValue(String key, String value) {
329329
public static void remove(String key) {
330330
PROPERTIES.remove(key);
331331
}
332+
332333
public static Map<String, String> getPropertiesByPrefix(String prefix) {
333334
if (StringUtils.isEmpty(prefix)) {
334335
return null;

datavines-common/src/main/java/io/datavines/common/utils/DateUtils.java

Lines changed: 103 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@
1616
*/
1717
package io.datavines.common.utils;
1818

19-
import java.time.Instant;
20-
import java.time.LocalDateTime;
21-
import java.time.ZoneId;
19+
import java.time.*;
2220
import java.time.format.DateTimeFormatter;
21+
import java.time.temporal.TemporalAdjusters;
2322
import java.util.Calendar;
2423
import java.util.Date;
2524
import java.util.Objects;
@@ -43,6 +42,7 @@ public class DateUtils {
4342
public static final String YYYYMMDDHHMMSS = "yyyyMMddHHmmss";
4443

4544
public static final String YYYYMMDD = "yyyyMMdd";
45+
4646
public static final String YYYY_MM_DD = "yyyy-MM-dd";
4747

4848
/**
@@ -107,6 +107,17 @@ public static String format(LocalDateTime localDateTime, String format) {
107107
return localDateTime.format(DateTimeFormatter.ofPattern(format));
108108
}
109109

110+
/**
111+
* get the formatted date string
112+
*
113+
* @param localDate local data
114+
* @param format yyyy-MM-dd HH:mm:ss
115+
* @return date string
116+
*/
117+
public static String format(LocalDate localDate, String format) {
118+
return localDate.format(DateTimeFormatter.ofPattern(format));
119+
}
120+
110121
/**
111122
* convert time to yyyy-MM-dd HH:mm:ss format
112123
*
@@ -117,7 +128,6 @@ public static String dateToString(Date date) {
117128
return format(date, YYYY_MM_DD_HH_MM_SS);
118129
}
119130

120-
121131
/**
122132
* convert string to date and time
123133
*
@@ -135,7 +145,6 @@ public static Date parse(String date, String format) {
135145
return null;
136146
}
137147

138-
139148
/**
140149
* convert date str to yyyy-MM-dd HH:mm:ss format
141150
*
@@ -171,7 +180,6 @@ public static long differMs(Date d1, Date d2) {
171180
return Math.abs(d1.getTime() - d2.getTime());
172181
}
173182

174-
175183
/**
176184
* get hours between two dates
177185
*
@@ -198,7 +206,6 @@ public static long diffMin(Date d1, Date d2) {
198206
return (long) Math.ceil(differSec(d1, d2) / 60.0);
199207
}
200208

201-
202209
/**
203210
* get the date of the specified date in the days before and after
204211
*
@@ -248,7 +255,6 @@ public static String format2Readable(long ms) {
248255
long seconds = (ms % (1000 * 60)) / 1000;
249256

250257
return String.format("%02d %02d:%02d:%02d", days, hours, minutes, seconds);
251-
252258
}
253259

254260
/**
@@ -302,38 +308,109 @@ public static Date getFirstDayOfMonth(Date date) {
302308
}
303309

304310
/**
305-
* get some hour of day
311+
* get last day of month
306312
*
307-
* @param date date
308-
* @param hours hours
309-
* @return some hour of day
310-
* */
311-
public static Date getSomeHourOfDay(Date date, int hours) {
313+
* @param date date
314+
* @return get last day of month
315+
*/
316+
public static Date getLastDayOfMonth(Date date) {
312317
Calendar cal = Calendar.getInstance();
313318

314319
cal.setTime(date);
315-
cal.set(Calendar.HOUR_OF_DAY, cal.get(Calendar.HOUR_OF_DAY) - hours);
316-
cal.set(Calendar.MINUTE, 0);
317-
cal.set(Calendar.SECOND, 0);
318-
cal.set(Calendar.MILLISECOND, 0);
320+
321+
cal.add(Calendar.MONTH, 1);
322+
cal.set(Calendar.DAY_OF_MONTH, 1);
323+
cal.add(Calendar.DAY_OF_MONTH, -1);
319324

320325
return cal.getTime();
321326
}
322327

323328
/**
324-
* get last day of month
329+
* get first day of week
325330
*
326-
* @param date date
327-
* @return get last day of month
331+
* @param date date
332+
* @return first day of week
328333
*/
329-
public static Date getLastDayOfMonth(Date date) {
334+
public static LocalDate getWeekStart(LocalDate date) {
335+
return date.with(TemporalAdjusters.previousOrSame(DayOfWeek.MONDAY));
336+
}
337+
338+
/**
339+
* get last day of week
340+
* @param date
341+
* @return
342+
*/
343+
public static LocalDate getWeekEnd(LocalDate date) {
344+
LocalDate startOfWeek = date.with(TemporalAdjusters.previousOrSame(DayOfWeek.MONDAY));
345+
return startOfWeek.with(TemporalAdjusters.nextOrSame(DayOfWeek.SUNDAY));
346+
}
347+
348+
/**
349+
* get the first day of the month
350+
*
351+
* @param date date
352+
* @return the first day of the month
353+
*/
354+
public static LocalDate getMonthStart(LocalDate date) {
355+
return date.withDayOfMonth(1);
356+
}
357+
358+
/**
359+
* get the last day of the month
360+
*
361+
* @param date date
362+
* @return the last day of the month
363+
*/
364+
public static LocalDate getMonthEnd(LocalDate date) {
365+
return date.with(TemporalAdjusters.lastDayOfMonth());
366+
}
367+
368+
/**
369+
* get the first time of the day
370+
*
371+
* @param date date
372+
* @return the first day of the year
373+
*/
374+
public static LocalDateTime getStartOfDay(LocalDate date) {
375+
return date.atStartOfDay();
376+
}
377+
378+
/**
379+
* get the last time of the day
380+
*
381+
* @param date date
382+
* @return the last day of the year
383+
*/
384+
public static LocalDateTime getEndOfDay(LocalDate date) {
385+
return date.atTime(LocalTime.MAX);
386+
}
387+
388+
/**
389+
* get the first time of the day after n days
390+
*
391+
* @param date date
392+
* @param n n
393+
* @return the first day of the year after n days
394+
*/
395+
public static LocalDateTime getEndOfDayAfterNDays(LocalDate date, int n) {
396+
return date.plusDays(n).atTime(LocalTime.MAX);
397+
}
398+
399+
/**
400+
* get some hour of day
401+
*
402+
* @param date date
403+
* @param hours hours
404+
* @return some hour of day
405+
* */
406+
public static Date getSomeHourOfDay(Date date, int hours) {
330407
Calendar cal = Calendar.getInstance();
331408

332409
cal.setTime(date);
333-
334-
cal.add(Calendar.MONTH, 1);
335-
cal.set(Calendar.DAY_OF_MONTH, 1);
336-
cal.add(Calendar.DAY_OF_MONTH, -1);
410+
cal.set(Calendar.HOUR_OF_DAY, cal.get(Calendar.HOUR_OF_DAY) - hours);
411+
cal.set(Calendar.MINUTE, 0);
412+
cal.set(Calendar.SECOND, 0);
413+
cal.set(Calendar.MILLISECOND, 0);
337414

338415
return cal.getTime();
339416
}

datavines-connector/datavines-connector-plugins/datavines-connector-flink/src/main/java/io/datavines/connector/plugin/FlinkMetricScript.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,36 +39,36 @@ public String columnNotMatchRegex() {
3939
public String dailyAvg(String uniqueKey) {
4040
return "SELECT ROUND(AVG(actual_value), 2) AS expected_value_" + uniqueKey +
4141
" FROM md_dv_actual_values" +
42-
" WHERE data_time >= TIMESTAMP ${data_time}" +
43-
" AND data_time < TIMESTAMP ${data_time} + INTERVAL '1' DAY" +
42+
" WHERE data_time >= '${day_start_time}'" +
43+
" AND data_time < '${day_end_time}'" +
4444
" AND unique_code = ${unique_code}";
4545
}
4646

4747
@Override
4848
public String last7DayAvg(String uniqueKey) {
4949
return "select round(avg(actual_value),2) as expected_value_" + uniqueKey +
50-
" from dv_actual_values where data_time >= TIMESTAMP ${data_time} + INTERVAL '-7' DAY" +
51-
" and data_time < TIMESTAMP ${data_time} + INTERVAL '1' DAY and unique_code = ${unique_code}";
50+
" from md_dv_actual_values where data_time >= '${day_start_time}'" +
51+
" and data_time < '${day_after_7_end_time}' and unique_code = ${unique_code}";
5252
}
5353

5454
@Override
5555
public String last30DayAvg(String uniqueKey) {
5656
return "select round(avg(actual_value),2) as expected_value_" + uniqueKey +
57-
" from dv_actual_values where data_time >= TIMESTAMP ${data_time} + INTERVAL '-30' DAY" +
58-
" and data_time < TIMESTAMP ${data_time} + INTERVAL '1' DAY and unique_code = ${unique_code}";
57+
" from md_dv_actual_values where data_time >= '${day_start_time}'" +
58+
" and data_time < '${day_after_30_end_time}' and unique_code = ${unique_code}";
5959
}
6060

6161
@Override
6262
public String monthlyAvg(String uniqueKey) {
6363
return "select round(avg(actual_value),2) as expected_value_" + uniqueKey +
64-
" from dv_actual_values where data_time >= DATE_TRUNC('MONTH', TIMESTAMP ${data_time})" +
65-
" and data_time < DATE_TRUNC('MONTH', TIMESTAMP ${data_time}) + INTERVAL '1' MONTH - INTERVAL '1' DAY and unique_code = ${unique_code}";
64+
" from md_dv_actual_values where data_time >= '${month_start_day} 00:00:00'" +
65+
" and data_time <= '${month_end_day} 23:59:59' and unique_code = ${unique_code}";
6666
}
6767

6868
@Override
6969
public String weeklyAvg(String uniqueKey) {
7070
return "select round(avg(actual_value),2) as expected_value_" + uniqueKey +
71-
" from dv_actual_values where data_time >= DATE_TRUNC('WEEK', TIMESTAMP ${data_time})" +
72-
" and data_time < DATE_TRUNC('WEEK', TIMESTAMP ${data_time}) + INTERVAL '1' WEEK - INTERVAL '1' DAY and unique_code = ${unique_code}";
71+
" from md_dv_actual_values where data_time >= '${week_start_day} 00:00:00'"+
72+
" and data_time <= '${week_end_day} 23:59:59' and unique_code = ${unique_code}";
7373
}
7474
}

datavines-engine/datavines-engine-config/src/main/java/io/datavines/engine/config/BaseJobConfigurationBuilder.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,7 @@
2222
import io.datavines.common.entity.*;
2323
import io.datavines.common.entity.job.BaseJobParameter;
2424
import io.datavines.common.exception.DataVinesException;
25-
import io.datavines.common.utils.CommonPropertyUtils;
26-
import io.datavines.common.utils.JSONUtils;
27-
import io.datavines.common.utils.ParameterUtils;
28-
import io.datavines.common.utils.StringUtils;
29-
import io.datavines.common.utils.placeholder.PlaceholderUtils;
25+
import io.datavines.common.utils.*;
3026
import io.datavines.connector.api.ConnectorFactory;
3127
import io.datavines.metric.api.ExpectedValue;
3228
import io.datavines.metric.api.SqlMetric;
@@ -35,12 +31,15 @@
3531
import org.apache.commons.codec.digest.DigestUtils;
3632
import org.apache.commons.collections4.CollectionUtils;
3733

34+
import java.time.LocalDate;
3835
import java.util.ArrayList;
3936
import java.util.HashMap;
4037
import java.util.List;
4138
import java.util.Map;
4239

40+
import static io.datavines.common.CommonConstants.*;
4341
import static io.datavines.common.ConfigConstants.*;
42+
import static io.datavines.common.ConfigConstants.TABLE;
4443
import static io.datavines.engine.config.MetricParserUtils.generateUniqueCode;
4544

4645
public abstract class BaseJobConfigurationBuilder implements JobConfigurationBuilder {
@@ -58,6 +57,15 @@ public abstract class BaseJobConfigurationBuilder implements JobConfigurationBui
5857
@Override
5958
public void init(Map<String, String> inputParameter, JobExecutionInfo jobExecutionInfo) {
6059
this.inputParameter = inputParameter;
60+
LocalDate nowDate = LocalDate.now();
61+
this.inputParameter.put(WEEK_START_DAY, DateUtils.format(DateUtils.getWeekStart(nowDate), DateUtils.YYYY_MM_DD));
62+
this.inputParameter.put(WEEK_END_DAY, DateUtils.format(DateUtils.getWeekEnd(nowDate), DateUtils.YYYY_MM_DD));
63+
this.inputParameter.put(MONTH_START_DAY, DateUtils.format(DateUtils.getMonthStart(nowDate), DateUtils.YYYY_MM_DD));
64+
this.inputParameter.put(MONTH_END_DAY, DateUtils.format(DateUtils.getMonthEnd(nowDate), DateUtils.YYYY_MM_DD));
65+
this.inputParameter.put(DAY_START_TIME, DateUtils.format(DateUtils.getStartOfDay(nowDate), DateUtils.YYYY_MM_DD_HH_MM_SS));
66+
this.inputParameter.put(DAY_END_TIME, DateUtils.format(DateUtils.getEndOfDay(nowDate), DateUtils.YYYY_MM_DD_HH_MM_SS));
67+
this.inputParameter.put(DAY_AFTER_7_END_TIME, DateUtils.format(DateUtils.getEndOfDayAfterNDays(nowDate,7), DateUtils.YYYY_MM_DD_HH_MM_SS));
68+
this.inputParameter.put(DAY_AFTER_30_END_TIME, DateUtils.format(DateUtils.getEndOfDayAfterNDays(nowDate,30), DateUtils.YYYY_MM_DD_HH_MM_SS));
6169
this.inputParameter.put(COLUMN, "");
6270
this.jobExecutionInfo = jobExecutionInfo;
6371
this.jobExecutionParameter = jobExecutionInfo.getJobExecutionParameter();

datavines-ui/Editor/components/MetricModal/ActuatorConfigure/index.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ const Index = ({ form, detail }: InnerProps) => {
148148
<Radio.Group>
149149
{/*<Radio value="local">{intl.formatMessage({ id: 'dv_flink_deploy_mode_local' })}</Radio>*/}
150150
<Radio value="yarn-session">{intl.formatMessage({ id: 'dv_flink_deploy_mode_yarn_session' })}</Radio>
151-
<Radio value="yarn-per-job">{intl.formatMessage({ id: 'dv_flink_deploy_mode_yarn_per_job' })}</Radio>
151+
{/*<Radio value="yarn-per-job">{intl.formatMessage({ id: 'dv_flink_deploy_mode_yarn_per_job' })}</Radio>*/}
152152
<Radio value="yarn-application">{intl.formatMessage({ id: 'dv_flink_deploy_mode_yarn_application' })}</Radio>
153153
</Radio.Group>
154154
</Form.Item>

pom.xml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -852,14 +852,14 @@
852852
</build>
853853

854854
<repositories>
855-
<!-- <repository>-->
856-
<!-- <id>public</id>-->
857-
<!-- <name>aliyun nexus</name>-->
858-
<!-- <url>http://maven.aliyun.com/nexus/content/groups/public/</url>-->
859-
<!-- <releases>-->
860-
<!-- <enabled>true</enabled>-->
861-
<!-- </releases>-->
862-
<!-- </repository>-->
855+
<repository>
856+
<id>public</id>
857+
<name>aliyun nexus</name>
858+
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
859+
<releases>
860+
<enabled>true</enabled>
861+
</releases>
862+
</repository>
863863
<repository>
864864
<id>apache.snapshots</id>
865865
<name>Apache Development Snapshot Repository</name>

0 commit comments

Comments
 (0)