Skip to content

Commit 9fb6958

Browse files
authored
HADOOP-19736: ABFS. Support for new auth type: User-bound SAS (#8051)
Contributed by Manika Joshi
1 parent 14f5055 commit 9fb6958

24 files changed

+1129
-148
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 73 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1240,9 +1240,9 @@ public int getNumLeaseThreads() {
12401240
}
12411241

12421242
public boolean getCreateRemoteFileSystemDuringInitialization() {
1243-
// we do not support creating the filesystem when AuthType is SAS
1243+
// we do not support creating the filesystem when AuthType is SAS or UserboundSASWithOAuth
12441244
return this.createRemoteFileSystemDuringInitialization
1245-
&& this.getAuthType(this.accountName) != AuthType.SAS;
1245+
&& !(validateForSASType(this.getAuthType(this.accountName)));
12461246
}
12471247

12481248
public boolean getSkipUserGroupMetadataDuringInitialization() {
@@ -1413,9 +1413,14 @@ public boolean shouldTrackLatency() {
14131413
return this.trackLatency;
14141414
}
14151415

1416+
public boolean validateForSASType(AuthType authType){
1417+
return authType == AuthType.SAS
1418+
|| authType == AuthType.UserboundSASWithOAuth;
1419+
}
1420+
14161421
public AccessTokenProvider getTokenProvider() throws TokenAccessProviderException {
14171422
AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey);
1418-
if (authType == AuthType.OAuth) {
1423+
if (authType == AuthType.OAuth || authType == AuthType.UserboundSASWithOAuth) {
14191424
try {
14201425
Class<? extends AccessTokenProvider> tokenProviderClass =
14211426
getTokenProviderClass(authType,
@@ -1563,7 +1568,7 @@ public AccessTokenProvider getTokenProvider() throws TokenAccessProviderExceptio
15631568
* the AbfsConfiguration with which a filesystem is initialized, and eliminate
15641569
* chances of dynamic modifications and spurious situations.<br>
15651570
* @return sasTokenProvider object based on configurations provided
1566-
* @throws AzureBlobFileSystemException
1571+
* @throws AzureBlobFileSystemException if SAS token provider initialization fails
15671572
*/
15681573
public SASTokenProvider getSASTokenProvider() throws AzureBlobFileSystemException {
15691574
AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey);
@@ -1610,6 +1615,70 @@ public SASTokenProvider getSASTokenProvider() throws AzureBlobFileSystemExceptio
16101615
}
16111616
}
16121617

1618+
/**
1619+
* Returns the SASTokenProvider implementation to be used to generate user-bound SAS token.
1620+
* Custom implementation of {@link SASTokenProvider} under th config
1621+
* "fs.azure.sas.token.provider.type" needs to be provided.
1622+
* @param authType authentication type
1623+
* @return sasTokenProvider object based on configurations provided
1624+
* @throws AzureBlobFileSystemException is user-bound SAS token provider initialization fails
1625+
*/
1626+
public SASTokenProvider getUserBoundSASTokenProvider(AuthType authType)
1627+
throws AzureBlobFileSystemException {
1628+
1629+
try {
1630+
Class<? extends SASTokenProvider> customSasTokenProviderImplementation =
1631+
getTokenProviderClass(authType, FS_AZURE_SAS_TOKEN_PROVIDER_TYPE,
1632+
null, SASTokenProvider.class);
1633+
1634+
if (customSasTokenProviderImplementation == null) {
1635+
throw new SASTokenProviderException(String.format(
1636+
"\"%s\" must be set for user-bound SAS auth type.",
1637+
FS_AZURE_SAS_TOKEN_PROVIDER_TYPE));
1638+
}
1639+
1640+
SASTokenProvider sasTokenProvider = ReflectionUtils.newInstance(
1641+
customSasTokenProviderImplementation, rawConfig);
1642+
if (sasTokenProvider == null) {
1643+
throw new SASTokenProviderException(String.format(
1644+
"Failed to initialize %s", customSasTokenProviderImplementation));
1645+
}
1646+
LOG.trace("Initializing {}", customSasTokenProviderImplementation.getName());
1647+
sasTokenProvider.initialize(rawConfig, accountName);
1648+
LOG.trace("{} init complete", customSasTokenProviderImplementation.getName());
1649+
return sasTokenProvider;
1650+
} catch (SASTokenProviderException e) {
1651+
throw e;
1652+
} catch (Exception e) {
1653+
throw new SASTokenProviderException(
1654+
"Unable to load user-bound SAS token provider class: " + e, e);
1655+
}
1656+
}
1657+
1658+
/**
1659+
* Returns both the AccessTokenProvider and the SASTokenProvider
1660+
* when auth type is UserboundSASWithOAuth.
1661+
*
1662+
* @return Object[] where:
1663+
* [0] = AccessTokenProvider
1664+
* [1] = SASTokenProvider
1665+
* @throws AzureBlobFileSystemException if provider initialization fails
1666+
*/
1667+
public Object[] getUserBoundSASBothTokenProviders()
1668+
throws AzureBlobFileSystemException {
1669+
AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME,
1670+
AuthType.SharedKey);
1671+
if (authType != AuthType.UserboundSASWithOAuth) {
1672+
throw new SASTokenProviderException(String.format(
1673+
"Invalid auth type: %s is being used, expecting user-bound SAS.",
1674+
authType));
1675+
}
1676+
1677+
AccessTokenProvider tokenProvider = getTokenProvider();
1678+
SASTokenProvider sasTokenProvider = getUserBoundSASTokenProvider(authType);
1679+
return new Object[]{tokenProvider, sasTokenProvider};
1680+
}
1681+
16131682
public EncryptionContextProvider createEncryptionContextProvider() {
16141683
try {
16151684
String configKey = FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE;

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@
133133
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_CREATE_ON_ROOT;
134134
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_INVALID_ABFS_STATE;
135135
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.UNAUTHORIZED_SAS;
136+
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.UNAUTHORIZED_USER_BOUND_SAS;
136137
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
137138
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
138139
import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
@@ -271,6 +272,29 @@ public void initialize(URI uri, Configuration configuration)
271272
throw new InvalidConfigurationValueException(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, ex);
272273
}
273274

275+
/**
276+
* Validates that User-bound SAS with OAuth is not used for FNS (non-hierarchical namespace) accounts.
277+
* Throws an InvalidConfigurationValueException if this configuration is detected.
278+
*
279+
* @throws InvalidConfigurationValueException if User-bound SAS with OAuth is configured for FNS accounts
280+
*/
281+
try {
282+
if (abfsConfiguration.getAuthType(abfsConfiguration.getAccountName())
283+
== AuthType.UserboundSASWithOAuth && // Auth type is User-bound SAS
284+
!tryGetIsNamespaceEnabled(
285+
new TracingContext(initFSTracingContext))) { // Account is FNS
286+
throw new InvalidConfigurationValueException(UNAUTHORIZED_USER_BOUND_SAS);
287+
}
288+
} catch (InvalidConfigurationValueException ex) {
289+
LOG.error("User-bound SAS not supported for FNS Accounts", ex);
290+
throw ex;
291+
} catch (AzureBlobFileSystemException ex) {
292+
LOG.error("Failed to determine account type for auth type validation",
293+
ex);
294+
throw new InvalidConfigurationValueException(
295+
FS_AZURE_ACCOUNT_IS_HNS_ENABLED, ex);
296+
}
297+
274298
/*
275299
* Non-hierarchical-namespace account can not have a customer-provided-key(CPK).
276300
* Fail initialization of filesystem if the configs are provided. CPK is of

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1754,11 +1754,20 @@ private void initializeClient(URI uri, String fileSystemName,
17541754
} else if (authType == AuthType.SAS) {
17551755
LOG.trace("Fetching SAS Token Provider");
17561756
sasTokenProvider = abfsConfiguration.getSASTokenProvider();
1757+
} else if (authType == AuthType.UserboundSASWithOAuth) {
1758+
LOG.trace("Fetching SAS and OAuth Token Provider for user bound SAS");
1759+
AzureADAuthenticator.init(abfsConfiguration);
1760+
Object[] providers
1761+
= abfsConfiguration.getUserBoundSASBothTokenProviders();
1762+
tokenProvider = (AccessTokenProvider) providers[0];
1763+
sasTokenProvider = (SASTokenProvider) providers[1];
1764+
ExtensionHelper.bind(tokenProvider, uri,
1765+
abfsConfiguration.getRawConfiguration());
17571766
} else {
17581767
LOG.trace("Fetching token provider");
17591768
tokenProvider = abfsConfiguration.getTokenProvider();
17601769
ExtensionHelper.bind(tokenProvider, uri,
1761-
abfsConfiguration.getRawConfiguration());
1770+
abfsConfiguration.getRawConfiguration());
17621771
}
17631772

17641773
// Encryption setup
@@ -1782,16 +1791,11 @@ private void initializeClient(URI uri, String fileSystemName,
17821791
}
17831792
}
17841793

1785-
LOG.trace("Initializing AbfsClient for {}", baseUrl);
1786-
if (tokenProvider != null) {
1787-
this.clientHandler = new AbfsClientHandler(baseUrl, creds, abfsConfiguration,
1788-
tokenProvider, encryptionContextProvider,
1789-
populateAbfsClientContext());
1790-
} else {
1791-
this.clientHandler = new AbfsClientHandler(baseUrl, creds, abfsConfiguration,
1792-
sasTokenProvider, encryptionContextProvider,
1793-
populateAbfsClientContext());
1794-
}
1794+
LOG.trace("Initializing AbfsClientHandler for {}", baseUrl);
1795+
this.clientHandler = new AbfsClientHandler(baseUrl, creds,
1796+
abfsConfiguration,
1797+
tokenProvider, sasTokenProvider, encryptionContextProvider,
1798+
populateAbfsClientContext());
17951799

17961800
this.setClient(getClientHandler().getClient());
17971801
LOG.trace("AbfsClient init complete");

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ public final class AbfsHttpConstants {
147147
public static final String APPLICATION_JSON = "application/json";
148148
public static final String APPLICATION_OCTET_STREAM = "application/octet-stream";
149149
public static final String APPLICATION_XML = "application/xml";
150+
public static final String APPLICATION_X_WWW_FORM_URLENCODED = "application/x-www-form-urlencoded";
150151
public static final String XMS_PROPERTIES_ENCODING_ASCII = "ISO-8859-1";
151152
public static final String XMS_PROPERTIES_ENCODING_UNICODE = "UTF-8";
152153

@@ -189,7 +190,8 @@ public enum ApiVersion {
189190
DEC_12_2019("2019-12-12"),
190191
APR_10_2021("2021-04-10"),
191192
AUG_03_2023("2023-08-03"),
192-
NOV_04_2024("2024-11-04");
193+
NOV_04_2024("2024-11-04"),
194+
JUL_05_2025("2025-07-05");
193195

194196
private final String xMsApiVersion;
195197

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -186,22 +186,10 @@ public AbfsBlobClient(final URL baseUrl,
186186
final SharedKeyCredentials sharedKeyCredentials,
187187
final AbfsConfiguration abfsConfiguration,
188188
final AccessTokenProvider tokenProvider,
189-
final EncryptionContextProvider encryptionContextProvider,
190-
final AbfsClientContext abfsClientContext) throws IOException {
191-
super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider,
192-
encryptionContextProvider, abfsClientContext, AbfsServiceType.BLOB);
193-
this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
194-
abfsConfiguration.getAzureAtomicRenameDirs()
195-
.split(AbfsHttpConstants.COMMA)));
196-
}
197-
198-
public AbfsBlobClient(final URL baseUrl,
199-
final SharedKeyCredentials sharedKeyCredentials,
200-
final AbfsConfiguration abfsConfiguration,
201189
final SASTokenProvider sasTokenProvider,
202190
final EncryptionContextProvider encryptionContextProvider,
203191
final AbfsClientContext abfsClientContext) throws IOException {
204-
super(baseUrl, sharedKeyCredentials, abfsConfiguration, sasTokenProvider,
192+
super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider, sasTokenProvider,
205193
encryptionContextProvider, abfsClientContext, AbfsServiceType.BLOB);
206194
this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
207195
abfsConfiguration.getAzureAtomicRenameDirs()

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -344,22 +344,24 @@ private AbfsClient(final URL baseUrl,
344344
LOG.trace("primaryUserGroup is {}", this.primaryUserGroup);
345345
}
346346

347-
public AbfsClient(final URL baseUrl,
348-
final SharedKeyCredentials sharedKeyCredentials,
349-
final AbfsConfiguration abfsConfiguration,
350-
final AccessTokenProvider tokenProvider,
351-
final EncryptionContextProvider encryptionContextProvider,
352-
final AbfsClientContext abfsClientContext,
353-
final AbfsServiceType abfsServiceType)
354-
throws IOException {
355-
this(baseUrl, sharedKeyCredentials, abfsConfiguration,
356-
encryptionContextProvider, abfsClientContext, abfsServiceType);
357-
this.tokenProvider = tokenProvider;
358-
}
359347

348+
/**
349+
* Constructs an AbfsClient instance with all authentication and configuration options.
350+
*
351+
* @param baseUrl The base URL for the ABFS endpoint.
352+
* @param sharedKeyCredentials Shared key credentials for authentication.
353+
* @param abfsConfiguration The ABFS configuration.
354+
* @param tokenProvider The access token provider for OAuth authentication.
355+
* @param sasTokenProvider The SAS token provider for SAS authentication.
356+
* @param encryptionContextProvider The encryption context provider.
357+
* @param abfsClientContext The client context
358+
* @param abfsServiceType The ABFS service type (e.g., Blob, DFS).
359+
* @throws IOException if initialization fails.
360+
*/
360361
public AbfsClient(final URL baseUrl,
361362
final SharedKeyCredentials sharedKeyCredentials,
362363
final AbfsConfiguration abfsConfiguration,
364+
final AccessTokenProvider tokenProvider,
363365
final SASTokenProvider sasTokenProvider,
364366
final EncryptionContextProvider encryptionContextProvider,
365367
final AbfsClientContext abfsClientContext,
@@ -368,6 +370,7 @@ public AbfsClient(final URL baseUrl,
368370
this(baseUrl, sharedKeyCredentials, abfsConfiguration,
369371
encryptionContextProvider, abfsClientContext, abfsServiceType);
370372
this.sasTokenProvider = sasTokenProvider;
373+
this.tokenProvider = tokenProvider;
371374
}
372375

373376
@Override
@@ -1179,7 +1182,7 @@ protected String appendSASTokenToQuery(String path,
11791182
String cachedSasToken)
11801183
throws SASTokenProviderException {
11811184
String sasToken = null;
1182-
if (this.authType == AuthType.SAS) {
1185+
if (getAbfsConfiguration().validateForSASType(this.authType)) {
11831186
try {
11841187
LOG.trace("Fetch SAS token for {} on {}", operation, path);
11851188
if (cachedSasToken == null) {

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java

Lines changed: 32 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -47,24 +47,26 @@ public class AbfsClientHandler implements Closeable {
4747
private final AbfsDfsClient dfsAbfsClient;
4848
private final AbfsBlobClient blobAbfsClient;
4949

50-
public AbfsClientHandler(final URL baseUrl,
51-
final SharedKeyCredentials sharedKeyCredentials,
52-
final AbfsConfiguration abfsConfiguration,
53-
final AccessTokenProvider tokenProvider,
54-
final EncryptionContextProvider encryptionContextProvider,
55-
final AbfsClientContext abfsClientContext) throws IOException {
56-
this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials,
57-
abfsConfiguration, tokenProvider, null, encryptionContextProvider,
58-
abfsClientContext);
59-
this.blobAbfsClient = createBlobClient(baseUrl, sharedKeyCredentials,
60-
abfsConfiguration, tokenProvider, null, encryptionContextProvider,
61-
abfsClientContext);
62-
initServiceType(abfsConfiguration);
63-
}
6450

51+
/**
52+
* Constructs an AbfsClientHandler instance.
53+
*
54+
* Initializes the default and ingress service types from the provided configuration,
55+
* then creates both DFS and Blob clients using the given params
56+
*
57+
* @param baseUrl the base URL for the file system.
58+
* @param sharedKeyCredentials credentials for shared key authentication.
59+
* @param abfsConfiguration the ABFS configuration.
60+
* @param tokenProvider the access token provider, may be null.
61+
* @param sasTokenProvider the SAS token provider, may be null.
62+
* @param encryptionContextProvider the encryption context provider
63+
* @param abfsClientContext the ABFS client context.
64+
* @throws IOException if client creation or URL conversion fails.
65+
*/
6566
public AbfsClientHandler(final URL baseUrl,
6667
final SharedKeyCredentials sharedKeyCredentials,
6768
final AbfsConfiguration abfsConfiguration,
69+
final AccessTokenProvider tokenProvider,
6870
final SASTokenProvider sasTokenProvider,
6971
final EncryptionContextProvider encryptionContextProvider,
7072
final AbfsClientContext abfsClientContext) throws IOException {
@@ -73,10 +75,10 @@ public AbfsClientHandler(final URL baseUrl,
7375
// only for default client.
7476
initServiceType(abfsConfiguration);
7577
this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials,
76-
abfsConfiguration, null, sasTokenProvider, encryptionContextProvider,
78+
abfsConfiguration, tokenProvider, sasTokenProvider, encryptionContextProvider,
7779
abfsClientContext);
7880
this.blobAbfsClient = createBlobClient(baseUrl, sharedKeyCredentials,
79-
abfsConfiguration, null, sasTokenProvider, encryptionContextProvider,
81+
abfsConfiguration, tokenProvider, sasTokenProvider, encryptionContextProvider,
8082
abfsClientContext);
8183
}
8284

@@ -154,17 +156,13 @@ private AbfsDfsClient createDfsClient(final URL baseUrl,
154156
final EncryptionContextProvider encryptionContextProvider,
155157
final AbfsClientContext abfsClientContext) throws IOException {
156158
URL dfsUrl = changeUrlFromBlobToDfs(baseUrl);
157-
if (tokenProvider != null) {
158-
LOG.debug("Creating AbfsDfsClient with access token provider using the URL: {}", dfsUrl);
159-
return new AbfsDfsClient(dfsUrl, creds, abfsConfiguration,
160-
tokenProvider, encryptionContextProvider,
161-
abfsClientContext);
162-
} else {
163-
LOG.debug("Creating AbfsDfsClient with SAS token provider using the URL: {}", dfsUrl);
164-
return new AbfsDfsClient(dfsUrl, creds, abfsConfiguration,
165-
sasTokenProvider, encryptionContextProvider,
166-
abfsClientContext);
167-
}
159+
LOG.debug(
160+
"Creating AbfsDfsClient with access token provider: %s and "
161+
+ "SAS token provider: %s using the URL: %s",
162+
tokenProvider, sasTokenProvider, dfsUrl);
163+
return new AbfsDfsClient(dfsUrl, creds, abfsConfiguration,
164+
tokenProvider, sasTokenProvider, encryptionContextProvider,
165+
abfsClientContext);
168166
}
169167

170168
/**
@@ -188,17 +186,13 @@ private AbfsBlobClient createBlobClient(final URL baseUrl,
188186
final EncryptionContextProvider encryptionContextProvider,
189187
final AbfsClientContext abfsClientContext) throws IOException {
190188
URL blobUrl = changeUrlFromDfsToBlob(baseUrl);
191-
if (tokenProvider != null) {
192-
LOG.debug("Creating AbfsBlobClient with access token provider using the URL: {}", blobUrl);
193-
return new AbfsBlobClient(blobUrl, creds, abfsConfiguration,
194-
tokenProvider, encryptionContextProvider,
195-
abfsClientContext);
196-
} else {
197-
LOG.debug("Creating AbfsBlobClient with SAS token provider using the URL: {}", blobUrl);
198-
return new AbfsBlobClient(blobUrl, creds, abfsConfiguration,
199-
sasTokenProvider, encryptionContextProvider,
200-
abfsClientContext);
201-
}
189+
LOG.debug(
190+
"Creating AbfsBlobClient with access token provider: %s and "
191+
+ "SAS token provider: %s using the URL: %s",
192+
tokenProvider, sasTokenProvider, blobUrl);
193+
return new AbfsBlobClient(blobUrl, creds, abfsConfiguration,
194+
tokenProvider, sasTokenProvider, encryptionContextProvider,
195+
abfsClientContext);
202196
}
203197

204198
@Override

0 commit comments

Comments
 (0)