Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -3998,6 +3998,15 @@ public static enum ConfVars {
"true", new StringSet("true", "false", "ignore"),
"Whether Tez session pool should allow submitting queries to custom queues. The options\n" +
"are true, false (error out), ignore (accept the query but ignore the queue setting)."),
HIVE_SERVER2_TEZ_USE_EXTERNAL_SESSIONS("hive.server2.use.external.sessions", false,
"This flag is used in HiveServer2 to use externally started tez sessions"),
HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE("hive.server2.tez.external.sessions.namespace", "",
"ZK namespace to use for tez external sessions"),
HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS("hive.server2.tez.external.sessions.wait.max.attempts",
60, "Number of attempts before giving up waiting for external sessions (each attempt is 1 sec long)"),
HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_REGISTRY_CLASS("hive.server2.tez.external.sessions.registry.class",
"org.apache.hadoop.hive.ql.exec.tez.DummyExternalSessionsRegistry", "Tez external sessions\n" +
"registry implementation to use"),
HIVE_MAPRED_JOB_FOLLOW_TEZ_QUEUE("hive.mapred.job.follow.tez.queue", false,
"Whether the MR jobs initiated by a query should be enforced to run in the queue denoted by "
+ "'tez.queue.name', e.g. DistCp jobs."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster;
import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionState;
import org.apache.hadoop.hive.ql.exec.tez.TezSession;
import org.apache.hadoop.hive.ql.lockmgr.zookeeper.CuratorFrameworkSingleton;
import org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager;
import org.apache.hadoop.hive.ql.session.SessionState;
Expand Down Expand Up @@ -358,7 +358,7 @@ public void restartSessions(boolean canReuseSession, CliSessionState ss, Session
if (oldSs != null && canReuseSession
&& clusterType.getCoreClusterType() == CoreClusterType.TEZ) {
// Copy the tezSessionState from the old CliSessionState.
TezSessionState tezSessionState = oldSs.getTezSession();
TezSession tezSessionState = oldSs.getTezSession();
oldSs.setTezSession(null);
ss.setTezSession(tezSessionState);
oldSs.close();
Expand Down
1 change: 0 additions & 1 deletion ql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,6 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hive.ql.ddl.process.kill;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.ddl.DDLOperation;
import org.apache.hadoop.hive.ql.ddl.DDLOperationContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
Expand All @@ -37,7 +38,12 @@ public KillQueriesOperation(DDLOperationContext context, KillQueriesDesc desc) {
public int execute() throws HiveException {
SessionState sessionState = SessionState.get();
for (String queryId : desc.getQueryIds()) {
sessionState.getKillQuery().killQuery(queryId, KILL_QUERY_MESSAGE, context.getDb().getConf());
// For now, get the config setting here; we can only have one type of the session present.
// Ideally we should check each session separately.
boolean isExternal = HiveConf.getBoolVar(context.getDb().getConf(),
HiveConf.ConfVars.HIVE_SERVER2_TEZ_USE_EXTERNAL_SESSIONS);
sessionState.getKillQuery().killQuery(queryId, KILL_QUERY_MESSAGE, context.getDb().getConf(),
!isExternal);
}
LOG.info("kill query called ({})", desc.getQueryIds());
return 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hive.ql.exec.tez;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

public abstract class AbstractTriggerValidator {
private ScheduledExecutorService scheduledExecutorService = null;
abstract Runnable getTriggerValidatorRunnable();

void startTriggerValidator(long triggerValidationIntervalMs) {
if (scheduledExecutorService == null) {
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TriggerValidator").build());
Runnable triggerValidatorRunnable = getTriggerValidatorRunnable();
scheduledExecutorService.scheduleWithFixedDelay(triggerValidatorRunnable, triggerValidationIntervalMs,
triggerValidationIntervalMs, TimeUnit.MILLISECONDS);
TezSessionPoolSession.LOG.info("Started trigger validator with interval: {} ms", triggerValidationIntervalMs);
}
}

void stopTriggerValidator() {
if (scheduledExecutorService != null) {
scheduledExecutorService.shutdownNow();
scheduledExecutorService = null;
TezSessionPoolSession.LOG.info("Stopped trigger validator");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.exec.tez;


import org.apache.hadoop.hive.conf.HiveConf;

public class DummyExternalSessionsRegistry implements ExternalSessionsRegistry {

// This constructor is required. Reflective instantiation will invoke this constructor.
public DummyExternalSessionsRegistry(HiveConf conf) {
}

@Override
public String getSession() throws Exception {
throw new UnsupportedOperationException("not supported in dummy external session registry");
}

@Override
public void returnSession(final String appId) {
throw new UnsupportedOperationException("not supported in dummy external session registry");
}

@Override
public void close() {
throw new UnsupportedOperationException("not supported in dummy external session registry");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.exec.tez;

import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.utils.JavaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public interface ExternalSessionsRegistry {
Logger LOG = LoggerFactory.getLogger(ExternalSessionsRegistry.class);

/**
* Returns application of id of the external session.
* @return application id
* @throws Exception in case of any exceptions
*/
String getSession() throws Exception;

Check warning on line 38 in ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistry.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace generic exceptions with specific library exceptions or a custom exception.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZyzPfkHRid15y8Zobpi&open=AZyzPfkHRid15y8Zobpi&pullRequest=6343

/**
* Returns external session back to registry.
* @param appId application id
*/
void returnSession(String appId);

/**

Check warning on line 46 in ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistry.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

First sentence should end with a period.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZyzPfkHRid15y8Zobpq&open=AZyzPfkHRid15y8Zobpq&pullRequest=6343
* Closes the external session registry
*/
void close();

Map<String, ExternalSessionsRegistry> INSTANCES = new HashMap<>();

Check warning on line 51 in ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistry.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Move "INSTANCES" to a class and lower its visibility

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZyzPfkHRid15y8Zobpk&open=AZyzPfkHRid15y8Zobpk&pullRequest=6343

static ExternalSessionsRegistry getClient(final Configuration conf) {
ExternalSessionsRegistry registry;
synchronized (INSTANCES) {
// TODO: change this to TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE after Tez 1.0.0 is released

Check warning on line 56 in ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistry.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Complete the task associated to this TODO comment.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZyzPfkHRid15y8Zobpl&open=AZyzPfkHRid15y8Zobpl&pullRequest=6343

Check warning on line 56 in ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistry.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Comment matches to-do format 'TODO:'.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZyzPfkHRid15y8Zobpr&open=AZyzPfkHRid15y8Zobpr&pullRequest=6343
String namespace = conf.get("tez.am.registry.namespace");
// HS2 would need to know about all coordinators running on all compute groups for a given compute (namespace)
// Setting this config to false in client, will make registry client listen on paths under @compute instead of
// @compute/compute-group
// TODO: change this to TezConfiguration.TEZ_AM_REGISTRY_ENABLE_COMPUTE_GROUPS after Tez 1.0.0 is released

Check warning on line 61 in ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistry.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Complete the task associated to this TODO comment.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZyzPfkHRid15y8Zobpm&open=AZyzPfkHRid15y8Zobpm&pullRequest=6343
conf.setBoolean("tez.am.registry.enable.compute.groups", false);
registry = INSTANCES.get(namespace);
String clazz = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_REGISTRY_CLASS);
if (registry == null) {
try {
registry = JavaUtils.newInstance(JavaUtils.getClass(clazz, ExternalSessionsRegistry.class),
new Class<?>[]{HiveConf.class}, new Object[]{conf});
} catch (MetaException e) {
throw new RuntimeException(e);

Check warning on line 70 in ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistry.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace generic exceptions with specific library exceptions or a custom exception.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZyzPfkHRid15y8Zobpj&open=AZyzPfkHRid15y8Zobpj&pullRequest=6343
}
INSTANCES.put(namespace, registry);
}
LOG.info("Returning tez external AM registry ({}) for namespace '{}'", System.identityHashCode(registry),
namespace);

Check warning on line 75 in ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistry.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

'namespace' has incorrect indentation level 8, expected level should be 10.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZyzPfkHRid15y8Zobpt&open=AZyzPfkHRid15y8Zobpt&pullRequest=6343
}
return registry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.DriverUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.KillQuery;
import org.apache.hadoop.hive.ql.wm.Action;
import org.apache.hadoop.hive.ql.wm.Trigger;
import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
Expand All @@ -33,35 +32,33 @@
/**
* Handles only Kill Action.
*/
public class KillTriggerActionHandler implements TriggerActionHandler<TezSessionState> {
public class KillTriggerActionHandler implements TriggerActionHandler<TezSession> {
private static final Logger LOG = LoggerFactory.getLogger(KillTriggerActionHandler.class);
private final HiveConf conf;

public KillTriggerActionHandler() {
this.conf = new HiveConf();
this.conf = new HiveConf();
}

@Override
public void applyAction(final Map<TezSessionState, Trigger> queriesViolated) {
for (Map.Entry<TezSessionState, Trigger> entry : queriesViolated.entrySet()) {
if (entry.getValue().getAction().getType() == Action.Type.KILL_QUERY) {
TezSessionState sessionState = entry.getKey();
String queryId = sessionState.getWmContext().getQueryId();
try {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
DriverUtils.setUpAndStartSessionState(conf, ugi.getShortUserName());
KillQuery killQuery = sessionState.getKillQuery();
// if kill query is null then session might have been released to pool or closed already
if (killQuery != null) {
sessionState.getKillQuery().killQuery(queryId, entry.getValue().getViolationMsg(),
sessionState.getConf());
}
} catch (HiveException | IOException e) {
LOG.warn("Unable to kill query {} for trigger violation", queryId);
}
} else {
throw new RuntimeException("Unsupported action: " + entry.getValue());
public void applyAction(Map<TezSession, Trigger> queriesViolated) {
for (Map.Entry<TezSession, Trigger> entry : queriesViolated.entrySet()) {
if (entry.getValue().getAction().getType() == Action.Type.KILL_QUERY) {
TezSession sessionState = entry.getKey();
String queryId = sessionState.getWmContext().getQueryId();
try {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
DriverUtils.setUpAndStartSessionState(conf, ugi.getShortUserName());
boolean wasKilled = sessionState.killQuery(entry.getValue().getViolationMsg());
if (!wasKilled) {
LOG.info("Didn't kill the query {}", queryId);
}
} catch (HiveException | IOException e) {
LOG.warn("Unable to kill query {} for trigger violation", queryId);
}
} else {
throw new RuntimeException("Unsupported action: " + entry.getValue());
}
}
}
}
Loading