Skip to content

Commit cc4e04d

Browse files
authored
Merge pull request #141 from cloudsufi/feature/ConnectionManagement
Feature/connection management
2 parents 709e104 + 02b9796 commit cc4e04d

23 files changed

+969
-162
lines changed

pom.xml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<name>Salesforce plugins</name>
2323
<groupId>io.cdap.plugin</groupId>
2424
<artifactId>salesforce-plugins</artifactId>
25-
<version>1.5.0-SNAPSHOT</version>
25+
<version>1.5.1-SNAPSHOT</version>
2626
<packaging>jar</packaging>
2727
<description>Salesforce Plugins</description>
2828
<url>https://github.com/data-integrations/salesforce</url>
@@ -79,6 +79,7 @@
7979
<testSourceLocation>${project.basedir}/src/test/java/</testSourceLocation>
8080
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
8181
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
82+
<powermock.version>2.0.2</powermock.version>
8283
</properties>
8384

8485
<repositories>
@@ -400,6 +401,18 @@
400401
<version>${mockito.version}</version>
401402
<scope>test</scope>
402403
</dependency>
404+
<dependency>
405+
<groupId>org.powermock</groupId>
406+
<artifactId>powermock-module-junit4</artifactId>
407+
<version>${powermock.version}</version>
408+
<scope>test</scope>
409+
</dependency>
410+
<dependency>
411+
<groupId>org.powermock</groupId>
412+
<artifactId>powermock-api-mockito2</artifactId>
413+
<version>${powermock.version}</version>
414+
<scope>test</scope>
415+
</dependency>
403416
<dependency>
404417
<groupId>com.force.api</groupId>
405418
<artifactId>force-metadata-api</artifactId>

src/main/java/io/cdap/plugin/salesforce/SalesforceConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class SalesforceConstants {
2727
public static final String PROPERTY_CONSUMER_SECRET = "consumerSecret";
2828
public static final String PROPERTY_USERNAME = "username";
2929
public static final String PROPERTY_PASSWORD = "password";
30+
public static final String PLUGIN_NAME = "Salesforce";
3031
public static final String PROPERTY_SECURITY_TOKEN = "securityToken";
3132
public static final String PROPERTY_LOGIN_URL = "loginUrl";
3233
public static final String PROPERTY_OAUTH_INFO = "oAuthInfo";
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/*
2+
* Copyright © 2022 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package io.cdap.plugin.salesforce.connector;
17+
18+
import com.sforce.async.AsyncApiException;
19+
import com.sforce.soap.partner.DescribeGlobalResult;
20+
import com.sforce.soap.partner.PartnerConnection;
21+
import com.sforce.soap.partner.QueryResult;
22+
import com.sforce.soap.partner.sobject.SObject;
23+
import com.sforce.ws.ConnectionException;
24+
import io.cdap.cdap.api.annotation.Description;
25+
import io.cdap.cdap.api.annotation.Name;
26+
import io.cdap.cdap.api.annotation.Plugin;
27+
import io.cdap.cdap.api.data.format.StructuredRecord;
28+
import io.cdap.cdap.api.data.schema.Schema;
29+
import io.cdap.cdap.etl.api.FailureCollector;
30+
import io.cdap.cdap.etl.api.batch.BatchSink;
31+
import io.cdap.cdap.etl.api.batch.BatchSource;
32+
import io.cdap.cdap.etl.api.connector.BrowseDetail;
33+
import io.cdap.cdap.etl.api.connector.BrowseEntity;
34+
import io.cdap.cdap.etl.api.connector.BrowseEntityPropertyValue;
35+
import io.cdap.cdap.etl.api.connector.BrowseRequest;
36+
import io.cdap.cdap.etl.api.connector.Connector;
37+
import io.cdap.cdap.etl.api.connector.ConnectorContext;
38+
import io.cdap.cdap.etl.api.connector.ConnectorSpec;
39+
import io.cdap.cdap.etl.api.connector.ConnectorSpecRequest;
40+
import io.cdap.cdap.etl.api.connector.DirectConnector;
41+
import io.cdap.cdap.etl.api.connector.PluginSpec;
42+
import io.cdap.cdap.etl.api.connector.SampleRequest;
43+
import io.cdap.cdap.etl.api.validation.ValidationException;
44+
import io.cdap.plugin.common.ConfigUtil;
45+
import io.cdap.plugin.salesforce.SObjectDescriptor;
46+
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
47+
import io.cdap.plugin.salesforce.SalesforceConstants;
48+
import io.cdap.plugin.salesforce.SalesforceSchemaUtil;
49+
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
50+
import io.cdap.plugin.salesforce.plugin.SalesforceConnectorConfig;
51+
import io.cdap.plugin.salesforce.plugin.sink.batch.SalesforceBatchSink;
52+
import io.cdap.plugin.salesforce.plugin.sink.batch.SalesforceSinkConfig;
53+
import io.cdap.plugin.salesforce.plugin.source.batch.MapToRecordTransformer;
54+
import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceBatchSource;
55+
import io.cdap.plugin.salesforce.plugin.source.batch.SoapRecordToMapTransformer;
56+
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
57+
import org.slf4j.Logger;
58+
import org.slf4j.LoggerFactory;
59+
60+
import java.io.IOException;
61+
import java.util.ArrayList;
62+
import java.util.HashMap;
63+
import java.util.List;
64+
import java.util.Map;
65+
66+
/**
67+
* Salesforce Connector Plugin
68+
*/
69+
@Plugin(type = Connector.PLUGIN_TYPE)
70+
@Name(SalesforceConstants.PLUGIN_NAME)
71+
@Description("Connection to access data in Salesforce SObject.")
72+
public class SalesforceConnector implements DirectConnector {
73+
private static final Logger LOG = LoggerFactory.getLogger(SalesforceConnector.class);
74+
private static final String ENTITY_TYPE_OBJECTS = "object";
75+
private static final String LABEL_NAME = "label";
76+
private final SalesforceConnectorConfig config;
77+
private StructuredRecord record;
78+
79+
SalesforceConnector(SalesforceConnectorConfig config) {
80+
this.config = config;
81+
}
82+
83+
@Override
84+
public void test(ConnectorContext connectorContext) throws ValidationException {
85+
FailureCollector collector = connectorContext.getFailureCollector();
86+
config.validate(collector);
87+
}
88+
89+
@Override
90+
public BrowseDetail browse(ConnectorContext connectorContext, BrowseRequest browseRequest) throws IOException {
91+
AuthenticatorCredentials credentials = new AuthenticatorCredentials(config.getUsername(), config.getPassword(),
92+
config.getConsumerKey(),
93+
config.getConsumerSecret(),
94+
config.getLoginUrl());
95+
BrowseDetail.Builder browseDetailBuilder = BrowseDetail.builder();
96+
int count = 0;
97+
try {
98+
PartnerConnection partnerConnection = SalesforceConnectionUtil.getPartnerConnection(credentials);
99+
DescribeGlobalResult dgr = partnerConnection.describeGlobal();
100+
// Loop through the array to get all the objects.
101+
for (int i = 0; i < dgr.getSobjects().length; i++) {
102+
String name = dgr.getSobjects()[i].getName();
103+
String label = dgr.getSobjects()[i].getLabel();
104+
BrowseEntity.Builder entity = (BrowseEntity.builder(name, name, ENTITY_TYPE_OBJECTS).
105+
canBrowse(false).canSample(true));
106+
entity.addProperty(LABEL_NAME, BrowseEntityPropertyValue.builder(label, BrowseEntityPropertyValue.
107+
PropertyType.STRING).build());
108+
browseDetailBuilder.addEntity(entity.build());
109+
count++;
110+
}
111+
} catch (ConnectionException e) {
112+
throw new IOException("Unable to create the connection.", e);
113+
}
114+
return browseDetailBuilder.setTotalCount(count).build();
115+
}
116+
117+
@Override
118+
public ConnectorSpec generateSpec(ConnectorContext connectorContext, ConnectorSpecRequest connectorSpecRequest)
119+
throws IOException {
120+
ConnectorSpec.Builder specBuilder = ConnectorSpec.builder();
121+
Map<String, String> properties = new HashMap<>();
122+
properties.put(io.cdap.plugin.common.ConfigUtil.NAME_USE_CONNECTION, "true");
123+
properties.put(ConfigUtil.NAME_CONNECTION, connectorSpecRequest.getConnectionWithMacro());
124+
String tableName = connectorSpecRequest.getPath();
125+
if (tableName != null) {
126+
properties.put(SalesforceSourceConstants.PROPERTY_SOBJECT_NAME, tableName);
127+
properties.put(SalesforceSinkConfig.PROPERTY_SOBJECT, tableName);
128+
}
129+
AuthenticatorCredentials authenticatorCredentials = config.getAuthenticatorCredentials();
130+
try {
131+
SObjectDescriptor sObjectDescriptor = SObjectDescriptor.fromName(tableName, authenticatorCredentials);
132+
Schema schema = SalesforceSchemaUtil.getSchema(authenticatorCredentials, sObjectDescriptor);
133+
specBuilder.setSchema(schema);
134+
} catch (ConnectionException e) {
135+
throw new IOException("Unable to generate Schema", e);
136+
}
137+
return specBuilder.addRelatedPlugin(new PluginSpec(SalesforceBatchSource.NAME, BatchSource.PLUGIN_TYPE,
138+
properties)).
139+
addRelatedPlugin(new PluginSpec(SalesforceBatchSink.PLUGIN_NAME, BatchSink.PLUGIN_TYPE, properties)).build();
140+
}
141+
142+
@Override
143+
public List<StructuredRecord> sample(ConnectorContext connectorContext, SampleRequest sampleRequest)
144+
throws IOException {
145+
String object = sampleRequest.getPath();
146+
if (object == null) {
147+
throw new IllegalArgumentException("Path should contain object");
148+
}
149+
try {
150+
return listObjectDetails(object, sampleRequest.getLimit());
151+
} catch (AsyncApiException | ConnectionException e) {
152+
throw new IOException("unable to fetch records", e);
153+
}
154+
}
155+
156+
private List<StructuredRecord> listObjectDetails(String object, int limit) throws AsyncApiException,
157+
ConnectionException {
158+
List<StructuredRecord> samples = new ArrayList<>();
159+
AuthenticatorCredentials credentials = new AuthenticatorCredentials(config.getUsername(), config.getPassword(),
160+
config.getConsumerKey(),
161+
config.getConsumerSecret(),
162+
config.getLoginUrl());
163+
String fields = getObjectFields(object);
164+
String query = String.format("SELECT %s FROM %s LIMIT %d", fields, object, limit);
165+
SObjectDescriptor sObjectDescriptor = SObjectDescriptor.fromQuery(query);
166+
SoapRecordToMapTransformer soapRecordToMapTransformer = new SoapRecordToMapTransformer();
167+
PartnerConnection partnerConnection = SalesforceConnectionUtil.getPartnerConnection(credentials);
168+
QueryResult queryResult = partnerConnection.query(query);
169+
SObject[] sObjects = queryResult.getRecords();
170+
Schema schema = SalesforceSchemaUtil.getSchema(credentials, sObjectDescriptor);
171+
MapToRecordTransformer transformer = new MapToRecordTransformer();
172+
for (int i = 0; i < sObjects.length; i++) {
173+
record = transformer.transform(schema, soapRecordToMapTransformer.transformToMap(sObjects[i], sObjectDescriptor));
174+
samples.add(record);
175+
}
176+
177+
return samples;
178+
}
179+
180+
private String getObjectFields(String object) throws ConnectionException {
181+
SObjectDescriptor sObjectDescriptor = SObjectDescriptor.fromName(object, config.getAuthenticatorCredentials(),
182+
SalesforceSchemaUtil.COMPOUND_FIELDS);
183+
List<String> actualFields = sObjectDescriptor.getFieldsNames();
184+
String result = String.join(",", actualFields);
185+
return result;
186+
}
187+
188+
}

src/main/java/io/cdap/plugin/salesforce/plugin/BaseSalesforceConfig.java renamed to src/main/java/io/cdap/plugin/salesforce/plugin/SalesforceConnectorConfig.java

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.cdap.cdap.api.annotation.Description;
2222
import io.cdap.cdap.api.annotation.Macro;
2323
import io.cdap.cdap.api.annotation.Name;
24+
import io.cdap.cdap.api.plugin.PluginConfig;
2425
import io.cdap.cdap.etl.api.FailureCollector;
2526
import io.cdap.plugin.common.ReferenceNames;
2627
import io.cdap.plugin.common.ReferencePluginConfig;
@@ -33,7 +34,7 @@
3334
/**
3435
* Base configuration for Salesforce Streaming and Batch plugins
3536
*/
36-
public class BaseSalesforceConfig extends ReferencePluginConfig {
37+
public class SalesforceConnectorConfig extends PluginConfig {
3738

3839
@Name(SalesforceConstants.PROPERTY_OAUTH_INFO)
3940
@Description("OAuth information for connecting to Salesforce. " +
@@ -80,17 +81,13 @@ public class BaseSalesforceConfig extends ReferencePluginConfig {
8081
@Nullable
8182
private String loginUrl;
8283

83-
private static final String DEFAULT_LOGIN_URL = "https://login.salesforce.com/services/oauth2/token";
84-
85-
public BaseSalesforceConfig(String referenceName,
86-
@Nullable String consumerKey,
84+
public SalesforceConnectorConfig(@Nullable String consumerKey,
8785
@Nullable String consumerSecret,
8886
@Nullable String username,
8987
@Nullable String password,
9088
@Nullable String loginUrl,
9189
@Nullable String securityToken,
9290
@Nullable OAuthInfo oAuthInfo) {
93-
super(referenceName);
9491
this.consumerKey = consumerKey;
9592
this.consumerSecret = consumerSecret;
9693
this.username = username;
@@ -130,27 +127,6 @@ public String getLoginUrl() {
130127
return loginUrl;
131128
}
132129

133-
public String getReferenceNameOrNormalizedFQN(String orgId, String sObject) {
134-
return Strings.isNullOrEmpty(referenceName)
135-
? ReferenceNames.normalizeFqn(getFQN(orgId, sObject))
136-
: referenceName;
137-
}
138-
139-
/**
140-
* Get fully-qualified name (FQN) for a Salesforce object (FQN format: salesforce://prod/orgId.mySobject).
141-
*
142-
* @return String fqn
143-
*/
144-
public String getFQN(String orgId, String sObject) {
145-
String firstFQNPart = loginUrl.equals(DEFAULT_LOGIN_URL) ? "prod" : "sandbox";
146-
return String.format("salesforce://%s/%s.%s", firstFQNPart, orgId, sObject);
147-
}
148-
149-
public String getOrgId() throws ConnectionException {
150-
PartnerConnection partnerConnection = SalesforceConnectionUtil.getPartnerConnection(getAuthenticatorCredentials());
151-
return partnerConnection.getUserInfo().getOrganizationId();
152-
}
153-
154130
public void validate(FailureCollector collector) {
155131
try {
156132
validateConnection();

src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceBatchSink.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
import com.sforce.ws.ConnectionException;
1919
import io.cdap.cdap.api.annotation.Description;
20+
import io.cdap.cdap.api.annotation.Metadata;
21+
import io.cdap.cdap.api.annotation.MetadataProperty;
2022
import io.cdap.cdap.api.annotation.Name;
2123
import io.cdap.cdap.api.annotation.Plugin;
2224
import io.cdap.cdap.api.data.batch.Output;
@@ -30,9 +32,11 @@
3032
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
3133
import io.cdap.cdap.etl.api.batch.BatchSink;
3234
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
35+
import io.cdap.cdap.etl.api.connector.Connector;
3336
import io.cdap.plugin.common.Asset;
3437
import io.cdap.plugin.common.LineageRecorder;
3538
import io.cdap.plugin.salesforce.SObjectDescriptor;
39+
import io.cdap.plugin.salesforce.SalesforceConstants;
3640
import org.apache.hadoop.io.NullWritable;
3741
import org.slf4j.Logger;
3842
import org.slf4j.LoggerFactory;
@@ -45,6 +49,7 @@
4549
@Plugin(type = BatchSink.PLUGIN_TYPE)
4650
@Name(SalesforceBatchSink.PLUGIN_NAME)
4751
@Description("Writes records to Salesforce")
52+
@Metadata(properties = {@MetadataProperty(key = Connector.PLUGIN_TYPE, value = SalesforceConstants.PLUGIN_NAME)})
4853
public class SalesforceBatchSink extends BatchSink<StructuredRecord, NullWritable, CSVRecord> {
4954

5055
private static final Logger LOG = LoggerFactory.getLogger(SalesforceBatchSink.class);

src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormatProvider.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,26 +51,26 @@ public SalesforceOutputFormatProvider(SalesforceSinkConfig config) {
5151
.put(SalesforceSinkConstants.CONFIG_ERROR_HANDLING, config.getErrorHandling().getValue())
5252
.put(SalesforceSinkConstants.CONFIG_MAX_BYTES_PER_BATCH, config.getMaxBytesPerBatch().toString())
5353
.put(SalesforceSinkConstants.CONFIG_MAX_RECORDS_PER_BATCH, config.getMaxRecordsPerBatch().toString());
54-
55-
OAuthInfo oAuthInfo = config.getOAuthInfo();
54+
OAuthInfo oAuthInfo = config.getConnection().getOAuthInfo();
5655
if (oAuthInfo != null) {
5756
configBuilder
5857
.put(SalesforceConstants.CONFIG_OAUTH_TOKEN, oAuthInfo.getAccessToken())
5958
.put(SalesforceConstants.CONFIG_OAUTH_INSTANCE_URL, oAuthInfo.getInstanceURL());
6059
} else {
6160
configBuilder
62-
.put(SalesforceConstants.CONFIG_USERNAME, Objects.requireNonNull(config.getUsername()))
63-
.put(SalesforceConstants.CONFIG_PASSWORD, Objects.requireNonNull(config.getPassword()))
64-
.put(SalesforceConstants.CONFIG_CONSUMER_KEY, Objects.requireNonNull(config.getConsumerKey()))
65-
.put(SalesforceConstants.CONFIG_CONSUMER_SECRET, Objects.requireNonNull(config.getConsumerSecret()))
66-
.put(SalesforceConstants.CONFIG_LOGIN_URL, Objects.requireNonNull(config.getLoginUrl()));
61+
.put(SalesforceConstants.CONFIG_USERNAME, Objects.requireNonNull(config.getConnection().getUsername()))
62+
.put(SalesforceConstants.CONFIG_PASSWORD, Objects.requireNonNull(config.getConnection().getPassword()))
63+
.put(SalesforceConstants.CONFIG_CONSUMER_KEY, Objects.requireNonNull(config.getConnection().getConsumerKey()))
64+
.put(SalesforceConstants.CONFIG_CONSUMER_SECRET, Objects.requireNonNull(config.getConnection().
65+
getConsumerSecret()))
66+
.put(SalesforceConstants.CONFIG_LOGIN_URL, Objects.requireNonNull(config.getConnection().getLoginUrl()));
6767
}
6868

6969
if (config.getExternalIdField() != null) {
7070
configBuilder.put(SalesforceSinkConstants.CONFIG_EXTERNAL_ID_FIELD, config.getExternalIdField());
7171
}
7272

73-
AuthenticatorCredentials credentials = config.getAuthenticatorCredentials();
73+
AuthenticatorCredentials credentials = config.getConnection().getAuthenticatorCredentials();
7474

7575
try {
7676
BulkConnection bulkConnection = new BulkConnection(Authenticator.createConnectorConfig(credentials));
@@ -81,7 +81,6 @@ public SalesforceOutputFormatProvider(SalesforceSinkConfig config) {
8181
} catch (AsyncApiException e) {
8282
throw new RuntimeException("There was issue communicating with Salesforce", e);
8383
}
84-
8584
this.configMap = configBuilder.build();
8685
}
8786

0 commit comments

Comments
 (0)