Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public void testListPartitionNamesNoDb() throws Exception {
super.testListPartitionNamesNoDb();
}

@Test
@Test(expected = NoSuchObjectException.class)
@Override
public void testListPartitionsAllNoTable() throws Exception {
super.testListPartitionsAllNoTable();
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,6 @@ DataConnector get_dataconnector_core(final String name)
AbortCompactResponse abort_Compactions(AbortCompactionRequest rqst) throws TException;

IMetaStoreMetadataTransformer getMetadataTransformer();

MetaStoreFilterHook getMetaFilterHook();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@

import java.util.List;

import org.apache.hadoop.hive.metastore.api.GetPartitionRequest;
import org.apache.hadoop.hive.metastore.api.GetPartitionsByFilterRequest;
import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthRequest;
import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
import org.apache.hadoop.hive.metastore.api.PartitionsRequest;

public class GetPartitionsArgs {
private String filter;
private byte[] expr;
Expand Down Expand Up @@ -183,6 +190,51 @@ public GetPartitionsArgs build() {
}
}

public static GetPartitionsArgs from(GetPartitionsByNamesRequest gpbnr) {
return new GetPartitionsArgsBuilder().partNames(gpbnr.getNames())
.skipColumnSchemaForPartition(gpbnr.isSkipColumnSchemaForPartition())
.excludeParamKeyPattern(gpbnr.getExcludeParamKeyPattern())
.includeParamKeyPattern(gpbnr.getIncludeParamKeyPattern()).build();
}

public static GetPartitionsArgs from(GetPartitionsPsWithAuthRequest req) {
return new GetPartitionsArgsBuilder()
.part_vals(req.getPartVals()).max(req.getMaxParts())
.userName(req.getUserName()).groupNames(req.getGroupNames())
.skipColumnSchemaForPartition(req.isSkipColumnSchemaForPartition())
.includeParamKeyPattern(req.getIncludeParamKeyPattern())
.excludeParamKeyPattern(req.getExcludeParamKeyPattern())
.partNames(req.getPartNames()).build();
}

public static GetPartitionsArgs from(GetPartitionsByFilterRequest req) {
return new GetPartitionsArgsBuilder()
.filter(req.getFilter()).max(req.getMaxParts())
.skipColumnSchemaForPartition(req.isSkipColumnSchemaForPartition())
.excludeParamKeyPattern(req.getExcludeParamKeyPattern())
.includeParamKeyPattern(req.getIncludeParamKeyPattern()).build();
}

public static GetPartitionsArgs from(PartitionsByExprRequest req) {
return new GetPartitionsArgsBuilder()
.expr(req.getExpr()).defaultPartName(req.getDefaultPartitionName()).max(req.getMaxParts())
.skipColumnSchemaForPartition(req.isSkipColumnSchemaForPartition())
.excludeParamKeyPattern(req.getExcludeParamKeyPattern())
.includeParamKeyPattern(req.getIncludeParamKeyPattern()).build();
}

public static GetPartitionsArgs from(PartitionsRequest req) {
return new GetPartitionsArgsBuilder()
.includeParamKeyPattern(req.getIncludeParamKeyPattern())
.excludeParamKeyPattern(req.getExcludeParamKeyPattern())
.skipColumnSchemaForPartition(req.isSkipColumnSchemaForPartition())
.max(req.getMaxParts()).build();
}

public static GetPartitionsArgs from(GetPartitionRequest req) {
return new GetPartitionsArgsBuilder().part_vals(req.getPartVals()).build();
}

public static GetPartitionsArgs getAllPartitions() {
return new GetPartitionsArgsBuilder().max(-1).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_IN_TEST;
import static org.apache.hadoop.hive.metastore.utils.JavaUtils.newInstance;

@SuppressWarnings("rawtypes")
public abstract class AbstractRequestHandler<T extends TBase, A extends AbstractRequestHandler.Result> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractRequestHandler.class);
private static final Map<String, AbstractRequestHandler> ID_TO_HANDLER = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -185,10 +186,6 @@ public RequestStatus getRequestStatus() throws TException {
protected A execute() throws TException, IOException {
throw new UnsupportedOperationException();
}
@Override
public String getMessagePrefix() {
throw new UnsupportedOperationException();
}
};
}
}
Expand All @@ -213,7 +210,7 @@ public static <T extends AbstractRequestHandler> T offer(IHMSHandler handler, TB
}

public RequestStatus getRequestStatus() throws TException {
String logMsgPrefix = getMessagePrefix();
String logMsgPrefix = toString();
if (future == null) {
throw new IllegalStateException(logMsgPrefix + " hasn't started yet");
}
Expand Down Expand Up @@ -271,7 +268,7 @@ public void cancelRequest() {
if (!future.isDone()) {
future.cancel(true);
aborted.set(true);
LOG.warn("{} is still running, but a close signal is sent out", getMessagePrefix());
LOG.warn("{} is still running, but a close signal is sent out", this);
}
executor.shutdown();
}
Expand All @@ -287,7 +284,7 @@ public final A getResult() throws TException {
RequestStatus resp = getRequestStatus();
if (!resp.finished) {
throw new IllegalStateException("Result is un-available as " +
getMessagePrefix() + " is still running");
this + " is still running");
}
return (A) result;
}
Expand Down Expand Up @@ -318,13 +315,6 @@ protected void afterExecute(A result) throws TException, IOException {
request = null;
}

/**
* Get the prefix for logging the message on polling the handler's status.
*
* @return message prefix
*/
protected abstract String getMessagePrefix();

/**
* Get the handler's progress that will show at the client.
*
Expand All @@ -350,7 +340,7 @@ private String getMetricAlias() {

public void checkInterrupted() throws MetaException {
if (aborted.get()) {
throw new MetaException(getMessagePrefix() + " has been interrupted");
throw new MetaException(this + " has been interrupted");
}
}

Expand Down Expand Up @@ -380,6 +370,11 @@ default Result shrinkIfNecessary() {
}
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" + id + "]";
}

private static boolean validateHandler(Class<? extends AbstractRequestHandler> clz) {
if (Modifier.isAbstract(clz.getModifiers())) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ private boolean createLocationForAddedPartition(
}

@Override
protected String getMessagePrefix() {
public String toString() {
return "AddPartitionsHandler [" + id + "] - Add partitions for " + tableName + ":";
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hive.metastore.handler;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.metastore.HMSHandler;
import org.apache.hadoop.hive.metastore.IHMSHandler;
import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier;
import org.apache.hadoop.hive.metastore.RawStore;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.AppendPartitionsRequest;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
import org.apache.hadoop.hive.metastore.events.PreAddPartitionEvent;
import org.apache.hadoop.hive.metastore.messaging.EventMessage;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hive.metastore.HMSHandler.getPartValsFromName;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.canUpdateStats;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.updatePartitionStatsFast;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.validatePartitionNameCharacters;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;

@RequestHandler(requestBody = AppendPartitionsRequest.class)
public class AppendPartitionHandler
extends AbstractRequestHandler<AppendPartitionsRequest, AppendPartitionHandler.AppendPartitionResult> {
private static final Logger LOG = LoggerFactory.getLogger(AppendPartitionHandler.class);
private RawStore ms;
private String catName;
private String dbName;
private String tableName;
private List<String> partVals;
private Table tbl;
private Warehouse wh;

AppendPartitionHandler(IHMSHandler handler, AppendPartitionsRequest request) {
super(handler, false, request);
}

@Override
protected void beforeExecute() throws TException, IOException {
List<String> part_vals = request.getPartVals();
dbName = normalizeIdentifier(request.getDbName());
catName = normalizeIdentifier(request.isSetCatalogName() ?
request.getCatalogName() : getDefaultCatalog(handler.getConf()));
tableName = normalizeIdentifier(request.getTableName());
String partName = request.getName();
if (partName == null && (part_vals == null || part_vals.isEmpty())) {
throw new MetaException("The partition values must not be null or empty.");
}

ms = handler.getMS();
wh = handler.getWh();
tbl = ms.getTable(catName, dbName, tableName, null);
if (tbl == null) {
throw new InvalidObjectException(dbName + "." + tableName + " table not found");
}
if (tbl.getSd().getLocation() == null) {
throw new MetaException("Cannot append a partition to a view");
}
if (part_vals == null || part_vals.isEmpty()) {
// partition name is set, get partition vals and then append partition
part_vals = getPartValsFromName(tbl, partName);
}
this.partVals = part_vals;
Partition old_part;
try {
old_part = ms.getPartition(catName, dbName, tableName, partVals);
} catch (NoSuchObjectException e) {
// this means there is no existing partition
old_part = null;
}
if (old_part != null) {
throw new AlreadyExistsException("Partition already exists:" + part_vals);
}
LOG.debug("Append partition: {}", part_vals);
validatePartitionNameCharacters(partVals, handler.getConf());
}

@Override
protected AppendPartitionResult execute() throws TException, IOException {
Partition part = new Partition();
part.setCatName(catName);
part.setDbName(dbName);
part.setTableName(tableName);
part.setValues(partVals);

boolean success = false, madeDir = false;
Path partLocation = null;
Map<String, String> transactionalListenerResponses = Collections.emptyMap();
Database db = null;
try {
ms.openTransaction();
db = handler.get_database_core(catName, dbName);
((HMSHandler) handler).firePreEvent(new PreAddPartitionEvent(tbl, part, handler));

part.setSd(tbl.getSd().deepCopy());
partLocation = new Path(tbl.getSd().getLocation(), Warehouse
.makePartName(tbl.getPartitionKeys(), partVals));
part.getSd().setLocation(partLocation.toString());

if (!wh.isDir(partLocation)) {
if (!wh.mkdirs(partLocation)) {
throw new MetaException(partLocation
+ " is not a directory or unable to create one");
}
madeDir = true;
}

// set create time
long time = System.currentTimeMillis() / 1000;
part.setCreateTime((int) time);
part.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time));
if (canUpdateStats(handler.getConf(), tbl)) {
updatePartitionStatsFast(part, tbl, wh, madeDir, false, request.getEnvironmentContext(), true);
}

if (ms.addPartition(part)) {
if (!handler.getTransactionalListeners().isEmpty()) {
transactionalListenerResponses =
MetaStoreListenerNotifier.notifyEvent(handler.getTransactionalListeners(),
EventMessage.EventType.ADD_PARTITION,
new AddPartitionEvent(tbl, part, true, handler),
request.getEnvironmentContext());
}

success = ms.commitTransaction();
}
} finally {
if (!success) {
ms.rollbackTransaction();
if (madeDir) {
wh.deleteDir(partLocation, false, ReplChangeManager.shouldEnableCm(db, tbl));
}
}

if (!handler.getListeners().isEmpty()) {
MetaStoreListenerNotifier.notifyEvent(handler.getListeners(),
EventMessage.EventType.ADD_PARTITION,
new AddPartitionEvent(tbl, part, success, handler),
request.getEnvironmentContext(),
transactionalListenerResponses, ms);
}
}
return new AppendPartitionResult(part, success);
}

@Override
public String toString() {
return "AppendPartitionHandler [" + id + "] - Append partition for " +
TableName.getQualified(catName, dbName, tableName) + ":";
}

public record AppendPartitionResult(Partition partition, boolean success) implements Result {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ protected void afterExecute(CreateDatabaseResult result) throws TException, IOEx
}

@Override
protected String getMessagePrefix() {
public String toString() {
return "CreateDatabaseHandler [" + id + "] - Create database " + name + ":";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ protected void beforeExecute() throws TException, IOException {
}

@Override
protected String getMessagePrefix() {
public String toString() {
return "CreateTableHandler [" + id + "] - create table for " +
TableName.getQualified(tbl.getCatName(), tbl.getDbName(), tbl.getTableName()) + ":";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,14 +344,14 @@ private List<Table> sortTablesToDrop() {
}

@Override
protected String getMessagePrefix() {
public String toString() {
return "DropDatabaseHandler [" + id + "] - Drop database " + name + ":";
}

@Override
protected String getRequestProgress() {
if (progress == null) {
return getMessagePrefix() + " hasn't started yet";
return this + " hasn't started yet";
}
return progress.get();
}
Expand Down
Loading