Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,29 @@ namespace doris {
#include "common/compile_check_begin.h"

std::vector<SchemaScanner::ColumnDesc> SchemaCatalogMetaCacheStatsScanner::_s_tbls_columns = {
{"FE_HOST", TYPE_STRING, sizeof(StringRef), true},
{"CATALOG_NAME", TYPE_STRING, sizeof(StringRef), true},
{"CACHE_NAME", TYPE_STRING, sizeof(StringRef), true},
{"METRIC_NAME", TYPE_STRING, sizeof(StringRef), true},
{"METRIC_VALUE", TYPE_STRING, sizeof(StringRef), true},
{"ENGINE_NAME", TYPE_STRING, sizeof(StringRef), true},
{"ENTRY_NAME", TYPE_STRING, sizeof(StringRef), true},
{"EFFECTIVE_ENABLED", TYPE_BOOLEAN, sizeof(bool), true},
{"CONFIG_ENABLED", TYPE_BOOLEAN, sizeof(bool), true},
{"AUTO_REFRESH", TYPE_BOOLEAN, sizeof(bool), true},
{"TTL_SECOND", TYPE_BIGINT, sizeof(int64_t), true},
{"CAPACITY", TYPE_BIGINT, sizeof(int64_t), true},
{"ESTIMATED_SIZE", TYPE_BIGINT, sizeof(int64_t), true},
{"REQUEST_COUNT", TYPE_BIGINT, sizeof(int64_t), true},
{"HIT_COUNT", TYPE_BIGINT, sizeof(int64_t), true},
{"MISS_COUNT", TYPE_BIGINT, sizeof(int64_t), true},
{"HIT_RATE", TYPE_DOUBLE, sizeof(double), true},
{"LOAD_SUCCESS_COUNT", TYPE_BIGINT, sizeof(int64_t), true},
{"LOAD_FAILURE_COUNT", TYPE_BIGINT, sizeof(int64_t), true},
{"TOTAL_LOAD_TIME_MS", TYPE_BIGINT, sizeof(int64_t), true},
{"AVG_LOAD_PENALTY_MS", TYPE_DOUBLE, sizeof(double), true},
{"EVICTION_COUNT", TYPE_BIGINT, sizeof(int64_t), true},
{"INVALIDATE_COUNT", TYPE_BIGINT, sizeof(int64_t), true},
{"LAST_LOAD_SUCCESS_TIME", TYPE_STRING, sizeof(StringRef), true},
{"LAST_LOAD_FAILURE_TIME", TYPE_STRING, sizeof(StringRef), true},
{"LAST_ERROR", TYPE_STRING, sizeof(StringRef), true},
};

SchemaCatalogMetaCacheStatsScanner::SchemaCatalogMetaCacheStatsScanner()
Expand Down
10 changes: 10 additions & 0 deletions be/src/information_schema/schema_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,16 @@ Status SchemaScanner::insert_block_column(TCell cell, int col_index, Block* bloc
break;
}

case TYPE_FLOAT: {
assert_cast<ColumnFloat32*>(col_ptr)->insert_value(cell.doubleVal);
break;
}

case TYPE_DOUBLE: {
assert_cast<ColumnFloat64*>(col_ptr)->insert_value(cell.doubleVal);
break;
}

case TYPE_BOOLEAN: {
reinterpret_cast<ColumnUInt8*>(col_ptr)->insert_value(cell.boolVal);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hive.HiveExternalMetaCache;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.info.TableNameInfo;
import org.apache.doris.persist.OperationType;
Expand Down Expand Up @@ -196,8 +196,8 @@ public void replayRefreshTable(ExternalObjectLog log) {
&& ((modifiedPartNames != null && !modifiedPartNames.isEmpty())
|| (newPartNames != null && !newPartNames.isEmpty()))) {
// Partition-level cache invalidation, only for hive catalog
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) catalog);
HiveExternalMetaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.hive(catalog.getId());
cache.refreshAffectedPartitionsCache((HMSExternalTable) table.get(), modifiedPartNames, newPartNames);
if (table.get() instanceof HMSExternalTable && log.getLastUpdateTime() > 0) {
((HMSExternalTable) table.get()).setUpdateTime(log.getLastUpdateTime());
Expand Down Expand Up @@ -280,7 +280,11 @@ public void refreshPartitions(String catalogName, String dbName, String tableNam
return;
}

Env.getCurrentEnv().getExtMetaCacheMgr().invalidatePartitionsCache((ExternalTable) table, partitionNames);
ExternalTable externalTable = (ExternalTable) table;
HiveExternalMetaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().hive(externalTable.getCatalog().getId());
for (String partitionName : partitionNames) {
cache.invalidatePartitionCache(externalTable, partitionName);
}
((HMSExternalTable) table).setUpdateTime(updateTime);
}

Expand Down
27 changes: 23 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -624,10 +624,29 @@ public class SchemaTable extends Table {
)
.put("catalog_meta_cache_statistics",
new SchemaTable(SystemIdGenerator.getNextId(), "catalog_meta_cache_statistics", TableType.SCHEMA,
builder().column("CATALOG_NAME", ScalarType.createStringType())
.column("CACHE_NAME", ScalarType.createStringType())
.column("METRIC_NAME", ScalarType.createStringType())
.column("METRIC_VALUE", ScalarType.createStringType())
builder().column("FE_HOST", ScalarType.createStringType())
.column("CATALOG_NAME", ScalarType.createStringType())
.column("ENGINE_NAME", ScalarType.createStringType())
.column("ENTRY_NAME", ScalarType.createStringType())
.column("EFFECTIVE_ENABLED", ScalarType.createType(PrimitiveType.BOOLEAN))
.column("CONFIG_ENABLED", ScalarType.createType(PrimitiveType.BOOLEAN))
.column("AUTO_REFRESH", ScalarType.createType(PrimitiveType.BOOLEAN))
.column("TTL_SECOND", ScalarType.createType(PrimitiveType.BIGINT))
.column("CAPACITY", ScalarType.createType(PrimitiveType.BIGINT))
.column("ESTIMATED_SIZE", ScalarType.createType(PrimitiveType.BIGINT))
.column("REQUEST_COUNT", ScalarType.createType(PrimitiveType.BIGINT))
.column("HIT_COUNT", ScalarType.createType(PrimitiveType.BIGINT))
.column("MISS_COUNT", ScalarType.createType(PrimitiveType.BIGINT))
.column("HIT_RATE", ScalarType.createType(PrimitiveType.DOUBLE))
.column("LOAD_SUCCESS_COUNT", ScalarType.createType(PrimitiveType.BIGINT))
.column("LOAD_FAILURE_COUNT", ScalarType.createType(PrimitiveType.BIGINT))
.column("TOTAL_LOAD_TIME_MS", ScalarType.createType(PrimitiveType.BIGINT))
.column("AVG_LOAD_PENALTY_MS", ScalarType.createType(PrimitiveType.DOUBLE))
.column("EVICTION_COUNT", ScalarType.createType(PrimitiveType.BIGINT))
.column("INVALIDATE_COUNT", ScalarType.createType(PrimitiveType.BIGINT))
.column("LAST_LOAD_SUCCESS_TIME", ScalarType.createStringType())
.column("LAST_LOAD_FAILURE_TIME", ScalarType.createStringType())
.column("LAST_ERROR", ScalarType.createStringType())
.build())
)
.put("backend_kerberos_ticket_cache",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hive.HiveExternalMetaCache;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fs.remote.RemoteFileSystem;
Expand Down Expand Up @@ -177,7 +177,7 @@ public static List<String> parseColumnsFromPath(
if (index == -1) {
continue;
}
columns[index] = HiveMetaStoreCache.HIVE_DEFAULT_PARTITION.equals(pair[1])
columns[index] = HiveExternalMetaCache.HIVE_DEFAULT_PARTITION.equals(pair[1])
? FeConstants.null_string : pair[1];
size++;
if (size >= columnsFromPath.size()) {
Expand Down
149 changes: 117 additions & 32 deletions fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EnvFactory;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.DdlException;
Expand All @@ -40,7 +41,10 @@
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalDatabase;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveExternalMetaCache;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.nereids.trees.plans.commands.CreateCatalogCommand;
import org.apache.doris.persist.OperationType;
import org.apache.doris.persist.gson.GsonPostProcessable;
Expand Down Expand Up @@ -120,21 +124,44 @@ private void addCatalog(CatalogIf catalog) {
}
}

private CatalogIf removeCatalog(long catalogId) {
CatalogIf catalog = idToCatalog.remove(catalogId);
LOG.info("Removed catalog with id {}, name {}", catalogId, catalog == null ? "N/A" : catalog.getName());
if (catalog != null) {
Env.getCurrentEnv().getRefreshManager().removeFromRefreshMap(catalogId);
catalog.onClose();
Env.getCurrentEnv().getConstraintManager().dropCatalogConstraints(catalog.getName());
nameToCatalog.remove(catalog.getName());
if (ConnectContext.get() != null) {
ConnectContext.get().removeLastDBOfCatalog(catalog.getName());
}
Env.getCurrentEnv().getExtMetaCacheMgr().removeCache(catalog.getId());
Env.getCurrentEnv().getQueryStats().clear(catalog.getId());
private RemovedCatalog removeCatalog(long catalogId) {
CatalogIf catalog = idToCatalog.get(catalogId);
if (catalog == null) {
return null;
}
String catalogName = catalog.getName();
Env.getCurrentEnv().getRefreshManager().removeFromRefreshMap(catalogId);
idToCatalog.remove(catalogId);
nameToCatalog.remove(catalogName);
return new RemovedCatalog(catalog, catalogName);
}

private void cleanupRemovedCatalog(RemovedCatalog removedCatalog) {
if (removedCatalog == null) {
return;
}
CatalogIf catalog = removedCatalog.catalog;
catalog.onClose();
Env.getCurrentEnv().getConstraintManager().dropCatalogConstraints(removedCatalog.catalogName);
ConnectContext ctx = ConnectContext.get();
if (ctx != null) {
ctx.removeLastDBOfCatalog(removedCatalog.catalogName);
}
Env.getCurrentEnv().getExtMetaCacheMgr().removeCatalog(removedCatalog.catalogId);
Env.getCurrentEnv().getQueryStats().clear(removedCatalog.catalogId);
LOG.info("Removed catalog with id {}, name {}", removedCatalog.catalogId, removedCatalog.catalogName);
}

private static final class RemovedCatalog {
private final CatalogIf catalog;
private final String catalogName;
private final long catalogId;

private RemovedCatalog(CatalogIf catalog, String catalogName) {
this.catalog = catalog;
this.catalogName = catalogName;
this.catalogId = catalog.getId();
}
return catalog;
}

public InternalCatalog getInternalCatalog() {
Expand Down Expand Up @@ -255,6 +282,7 @@ public void createCatalog(CreateCatalogCommand cmd) throws UserException {
* Remove the catalog instance by name and write the meta log.
*/
public void dropCatalog(String catalogName, boolean ifExists) throws UserException {
RemovedCatalog removedCatalog = null;
writeLock();
try {
if (ifExists && !nameToCatalog.containsKey(catalogName)) {
Expand All @@ -267,23 +295,24 @@ public void dropCatalog(String catalogName, boolean ifExists) throws UserExcepti
}
CatalogLog log = new CatalogLog();
log.setCatalogId(catalog.getId());
replayDropCatalog(log);
removedCatalog = removeCatalog(log.getCatalogId());
Env.getCurrentEnv().getEditLog().logCatalogLog(OperationType.OP_DROP_CATALOG, log);

if (ConnectContext.get() != null) {
ConnectContext.get().removeLastDBOfCatalog(catalogName);
}
Env.getCurrentEnv().getQueryStats().clear(catalog.getId());
LOG.info("finished to drop catalog {}:{}", catalog.getName(), catalog.getId());
} finally {
writeUnlock();
cleanupRemovedCatalog(removedCatalog);
}
if (removedCatalog == null) {
return;
}
LOG.info("finished to drop catalog {}:{}", removedCatalog.catalogName, removedCatalog.catalogId);
}

/**
* Modify the catalog name into a new one and write the meta log.
*/
public void alterCatalogName(String catalogName, String newCatalogName) throws UserException {
RemovedCatalog removedCatalog = null;
String lastDb = null;
writeLock();
try {
CatalogIf catalog = nameToCatalog.get(catalogName);
Expand All @@ -296,17 +325,49 @@ public void alterCatalogName(String catalogName, String newCatalogName) throws U
CatalogLog log = new CatalogLog();
log.setCatalogId(catalog.getId());
log.setNewCatalogName(newCatalogName);
replayAlterCatalogName(log);
Env.getCurrentEnv().getEditLog().logCatalogLog(OperationType.OP_ALTER_CATALOG_NAME, log);
ConnectContext ctx = ConnectContext.get();
if (ctx != null) {
lastDb = ctx.getLastDBOfCatalog(catalogName);
}
removedCatalog = removeCatalog(log.getCatalogId());
} finally {
writeUnlock();
}
cleanupRemovedCatalog(removedCatalog);
if (removedCatalog == null) {
throw new IllegalStateException("No catalog found with name: " + catalogName);
}

writeLock();
try {
DdlException ddlException = null;
CatalogIf catalog = removedCatalog.catalog;
if (nameToCatalog.get(newCatalogName) != null) {
addCatalog(catalog);
ddlException = new DdlException("Catalog with name " + newCatalogName + " already exist");
} else {
catalog.modifyCatalogName(newCatalogName);
addCatalog(catalog);

CatalogLog log = new CatalogLog();
log.setCatalogId(catalog.getId());
log.setNewCatalogName(newCatalogName);
Env.getCurrentEnv().getEditLog().logCatalogLog(OperationType.OP_ALTER_CATALOG_NAME, log);
}

ConnectContext ctx = ConnectContext.get();
if (ctx != null) {
String db = ctx.getLastDBOfCatalog(catalogName);
if (db != null) {
ctx.removeLastDBOfCatalog(catalogName);
ctx.addLastDBOfCatalog(log.getNewCatalogName(), db);
if (lastDb != null) {
if (ddlException == null) {
ctx.addLastDBOfCatalog(newCatalogName, lastDb);
} else {
ctx.addLastDBOfCatalog(catalogName, lastDb);
}
}
}
if (ddlException != null) {
throw ddlException;
}
} finally {
writeUnlock();
}
Expand Down Expand Up @@ -499,22 +560,37 @@ private void createCatalogInternal(CatalogIf catalog, boolean isReplay) throws D
* Reply for drop catalog event.
*/
public void replayDropCatalog(CatalogLog log) {
RemovedCatalog removedCatalog;
writeLock();
try {
removeCatalog(log.getCatalogId());
removedCatalog = removeCatalog(log.getCatalogId());
} finally {
writeUnlock();
}
cleanupRemovedCatalog(removedCatalog);
}

/**
* Reply for alter catalog name event.
*/
public void replayAlterCatalogName(CatalogLog log) {
RemovedCatalog removedCatalog;
writeLock();
try {
removedCatalog = removeCatalog(log.getCatalogId());
} finally {
writeUnlock();
}
cleanupRemovedCatalog(removedCatalog);

if (removedCatalog == null) {
throw new IllegalStateException("No catalog found with id: " + log.getCatalogId());
}
CatalogIf catalog = removedCatalog.catalog;
catalog.modifyCatalogName(log.getNewCatalogName());

writeLock();
try {
CatalogIf catalog = removeCatalog(log.getCatalogId());
catalog.modifyCatalogName(log.getNewCatalogName());
addCatalog(catalog);
} finally {
writeUnlock();
Expand Down Expand Up @@ -726,7 +802,15 @@ public void addExternalPartitions(String catalogName, String dbName, String tabl
}

HMSExternalTable hmsTable = (HMSExternalTable) table;
Env.getCurrentEnv().getExtMetaCacheMgr().addPartitionsCache(catalog.getId(), hmsTable, partitionNames);
List<Type> partitionColumnTypes;
try {
partitionColumnTypes = hmsTable.getPartitionColumnTypes(MvccUtil.getSnapshotFromContext(hmsTable));
} catch (NotSupportedException e) {
LOG.warn("Ignore not supported hms table, message: {} ", e.getMessage());
return;
}
HiveExternalMetaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().hive(catalog.getId());
cache.addPartitionsCache(hmsTable.getOrBuildNameMapping(), partitionNames, partitionColumnTypes);
hmsTable.setUpdateTime(updateTime);
}

Expand Down Expand Up @@ -757,7 +841,8 @@ public void dropExternalPartitions(String catalogName, String dbName, String tab
}

HMSExternalTable hmsTable = (HMSExternalTable) table;
Env.getCurrentEnv().getExtMetaCacheMgr().dropPartitionsCache(catalog.getId(), hmsTable, partitionNames);
Env.getCurrentEnv().getExtMetaCacheMgr().hive(catalog.getId())
.dropPartitionsCache(hmsTable, partitionNames, true);
hmsTable.setUpdateTime(updateTime);
}

Expand Down
Loading
Loading