Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions be/src/vec/exec/scan/meta_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ Status MetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) {
case TMetadataType::PARTITION_VALUES:
RETURN_IF_ERROR(_build_partition_values_metadata_request(meta_scan_range, &request));
break;
case TMetadataType::TABLETS:
RETURN_IF_ERROR(_build_tablets_metadata_request(meta_scan_range, &request));
break;
default:
_meta_eos = true;
return Status::OK();
Expand Down Expand Up @@ -529,6 +532,24 @@ Status MetaScanner::_build_partition_values_metadata_request(
return Status::OK();
}

Status MetaScanner::_build_tablets_metadata_request(
const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request) {
VLOG_CRITICAL << "MetaScanner::_build_tablets_metadata_request";
if (!meta_scan_range.__isset.tablets_params) {
return Status::InternalError("Can not find TTabletsMetadataParams from meta_scan_range.");
}
// create request
request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE);

// create TMetadataTableRequestParams
TMetadataTableRequestParams metadata_table_params;
metadata_table_params.__set_metadata_type(TMetadataType::TABLETS);
metadata_table_params.__set_tablets_metadata_params(meta_scan_range.tablets_params);

request->__set_metada_table_params(metadata_table_params);
return Status::OK();
}

Status MetaScanner::close(RuntimeState* state) {
VLOG_CRITICAL << "MetaScanner::close";
if (!_try_close()) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/scan/meta_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ class MetaScanner : public Scanner {
TFetchSchemaTableDataRequest* request);
Status _build_partition_values_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
Status _build_tablets_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
bool _meta_eos;
TupleId _tuple_id;
TUserIdentity _user_identity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
import org.apache.doris.nereids.trees.expressions.functions.table.Partitions;
import org.apache.doris.nereids.trees.expressions.functions.table.Query;
import org.apache.doris.nereids.trees.expressions.functions.table.S3;
import org.apache.doris.nereids.trees.expressions.functions.table.Tablets;
import org.apache.doris.nereids.trees.expressions.functions.table.Tasks;
import org.apache.doris.tablefunction.TabletsTableValuedFunction;

import com.google.common.collect.ImmutableList;

Expand Down Expand Up @@ -77,7 +79,8 @@ public class BuiltinTableValuedFunctions implements FunctionHelper {
tableValued(ParquetMeta.class, "parquet_meta"),
tableValued(ParquetFileMetadata.class, "parquet_file_metadata"),
tableValued(ParquetKvMetadata.class, "parquet_kv_metadata"),
tableValued(ParquetBloomProbe.class, "parquet_bloom_probe")
tableValued(ParquetBloomProbe.class, "parquet_bloom_probe"),
tableValued(Tablets.class, TabletsTableValuedFunction.NAME)
);

public static final BuiltinTableValuedFunctions INSTANCE = new BuiltinTableValuedFunctions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.statistics.query.QueryStatsUtil;
import org.apache.doris.system.Backend;
import org.apache.doris.tablefunction.TabletsTableValuedFunction;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import java.util.ArrayList;
Expand All @@ -48,24 +48,6 @@
* show tablets' detail info within an index
*/
public class TabletsProcDir implements ProcDirInterface {
public static final ImmutableList<String> TITLE_NAMES;

static {
ImmutableList.Builder<String> builder = new ImmutableList.Builder<String>()
.add("TabletId").add("ReplicaId").add("BackendId").add("SchemaHash").add("Version")
.add("LstSuccessVersion").add("LstFailedVersion").add("LstFailedTime")
.add("LocalDataSize").add("RemoteDataSize").add("RowCount").add("State")
.add("LstConsistencyCheckTime").add("CheckVersion")
.add("VisibleVersionCount").add("VersionCount").add("QueryHits").add("PathHash").add("Path")
.add("MetaUrl").add("CompactionStatus")
.add("CooldownReplicaId").add("CooldownMetaId");

if (Config.isCloudMode()) {
builder.add("PrimaryBackendId");
}

TITLE_NAMES = builder.build();
}

private Table table;
private MaterializedIndex index;
Expand Down Expand Up @@ -197,7 +179,7 @@ public List<List<Comparable>> fetchComparableResult(long version, long backendId
return tabletInfos;
}

private List<List<Comparable>> fetchComparableResult() throws AnalysisException {
public List<List<Comparable>> fetchComparableResult() throws AnalysisException {
return fetchComparableResult(-1, -1, null);
}

Expand All @@ -210,7 +192,7 @@ public ProcResult fetchResult() throws AnalysisException {

// set result
BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);
result.setNames(TabletsTableValuedFunction.getTabletsTitleNames());

for (int i = 0; i < tabletInfos.size(); i++) {
List<Comparable> info = tabletInfos.get(i);
Expand Down Expand Up @@ -250,9 +232,9 @@ public ProcNodeInterface lookup(String tabletIdStr) throws AnalysisException {
}

public static int analyzeColumn(String columnName) throws AnalysisException {
for (String title : TITLE_NAMES) {
for (String title : TabletsTableValuedFunction.getTabletsTitleNames()) {
if (title.equalsIgnoreCase(columnName)) {
return TITLE_NAMES.indexOf(title);
return TabletsTableValuedFunction.getTabletsTitleNames().indexOf(title);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.expressions.functions.table;

import org.apache.doris.catalog.FunctionSignature;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.coercion.AnyDataType;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import org.apache.doris.tablefunction.TabletsTableValuedFunction;

import java.util.Map;

/**
* tablets
*/
public class Tablets extends TableValuedFunction {

public Tablets(Properties tvfProperties) {
super(TabletsTableValuedFunction.NAME, tvfProperties);
}

@Override
protected TableValuedFunctionIf toCatalogFunction() {
try {
Map<String, String> arguments = getTVFProperties().getMap();
return TabletsTableValuedFunction.create(arguments);
} catch (Throwable t) {
throw new AnalysisException("Can not build TabletsTableValuedFunction by "
+ this + ": " + t.getMessage(), t);
}
}

@Override
public FunctionSignature customSignature() {
return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes());
}

@Override
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitTablets(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.doris.nereids.trees.expressions.functions.table.Query;
import org.apache.doris.nereids.trees.expressions.functions.table.S3;
import org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction;
import org.apache.doris.nereids.trees.expressions.functions.table.Tablets;
import org.apache.doris.nereids.trees.expressions.functions.table.Tasks;

/** TableValuedFunctionVisitor */
Expand Down Expand Up @@ -132,4 +133,8 @@ default R visitS3(S3 s3, C context) {
default R visitQuery(Query query, C context) {
return visitTableValuedFunction(query, context);
}

default R visitTablets(Tablets tablets, C context) {
return visitTableValuedFunction(tablets, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.doris.qe.ShowResultSet;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.tablefunction.TabletsTableValuedFunction;

import com.google.common.collect.Lists;

Expand Down Expand Up @@ -276,7 +277,7 @@ public ShowResultSet doRun(ConnectContext ctx, StmtExecutor executor) throws Exc
*/
public ShowResultSetMetaData getMetaData() {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
for (String title : TabletsProcDir.TITLE_NAMES) {
for (String title : TabletsTableValuedFunction.getTabletsTitleNames()) {
builder.addColumn(new Column(title, ScalarType.createVarchar(128)));
}
return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
Expand All @@ -43,9 +44,12 @@
import org.apache.doris.catalog.View;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.Pair;
import org.apache.doris.common.proc.FrontendsProcNode;
import org.apache.doris.common.proc.PartitionsProcDir;
import org.apache.doris.common.proc.TabletsProcDir;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.TimeUtils;
Expand Down Expand Up @@ -105,6 +109,7 @@
import org.apache.doris.thrift.TSchemaTableRequestParams;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTabletsMetadataParams;
import org.apache.doris.thrift.TTasksMetadataParams;
import org.apache.doris.thrift.TUserIdentity;

Expand All @@ -115,6 +120,8 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
Expand All @@ -128,7 +135,9 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class MetadataGenerator {
Expand Down Expand Up @@ -279,6 +288,9 @@ public static TFetchSchemaTableDataResult getMetadataTable(TFetchSchemaTableData
case PARTITION_VALUES:
result = partitionValuesMetadataResult(params);
break;
case TABLETS:
result = tabletsMetadataResult(params);
break;
default:
return errorResult("Metadata table params is not set.");
}
Expand Down Expand Up @@ -1928,4 +1940,72 @@ private static List<TRow> partitionValuesMetadataResultForHmsTable(HMSExternalTa
return dataBatch;
}

private static TFetchSchemaTableDataResult tabletsMetadataResult(TMetadataTableRequestParams params)
throws TException {
if (!params.isSetTabletsMetadataParams()) {
return errorResult("tablets metadata param is not set.");
}
TTabletsMetadataParams tabletsMetadataParams = params.getTabletsMetadataParams();
String databaseName = tabletsMetadataParams.getDatabaseName();
String tableName = tabletsMetadataParams.getTableName();
List<String> partitionNames = tabletsMetadataParams.getPartitionNames();
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
List<TRow> dataBatch = Lists.newArrayList();
Database db;
OlapTable olapTable = null;
try {
// check access first
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "tablets tvf");
}
Env env = Env.getCurrentEnv();
db = env.getInternalCatalog().getDbOrAnalysisException(databaseName);
olapTable = db.getOlapTableOrAnalysisException(tableName);
olapTable.readLock();
Collection<Partition> tablePartitions = olapTable.getPartitions();
List<String> tablePartitionNames = tablePartitions.stream()
.map(Partition::getName).collect(Collectors.toList());
Collection<Partition> targetPartitions;
if (CollectionUtils.isNotEmpty(partitionNames)) {
Set<String> existedPartitionNames = partitionNames.stream()
.filter(p -> StringUtils.isNotEmpty(p) && StringUtils.isNotBlank(p)
&& tablePartitionNames.contains(p))
.collect(Collectors.toSet());
targetPartitions = tablePartitions.stream()
.filter(t -> existedPartitionNames.contains(t.getName())).collect(Collectors.toList());
} else {
targetPartitions = new ArrayList<>(tablePartitions);
}
List<List<Comparable>> tabletInfos = new ArrayList<>();
for (Partition partition : targetPartitions) {
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
TabletsProcDir procDir = new TabletsProcDir(olapTable, index);
List<List<Comparable>> fetchedComparableResult = procDir.fetchComparableResult();
tabletInfos.addAll(fetchedComparableResult);
}
}
for (List<Comparable> tabletInfo : tabletInfos) {
TRow trow = new TRow();
for (Comparable item : tabletInfo) {
if (item != null) {
trow.addToColumnValue(new TCell().setStringVal(item.toString()));
} else {
trow.addToColumnValue(new TCell().setStringVal("NULL"));
}
}
dataBatch.add(trow);
}
result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
} catch (Exception e) {
LOG.warn("error when fetching tablets metadata, db: {}, table: {}, partitions: {}",
databaseName, tableName, partitionNames);
throw new TException(e);
} finally {
if (olapTable != null) {
olapTable.readUnlock();
}
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ public static TableValuedFunctionIf getTableFunction(String funcName, Map<String
return new FileTableValuedFunction(params);
case HttpTableValuedFunction.NAME:
return new HttpTableValuedFunction(params);
case TabletsTableValuedFunction.NAME:
return TabletsTableValuedFunction.create(params);
default:
throw new AnalysisException("Could not find table function " + funcName);
}
Expand Down
Loading