Skip to content
1 change: 1 addition & 0 deletions docs/HTTP-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ can be omitted as long as the field is present in schema.

### Authentication
* **OAuth2**
* **Grant Type:** Value of grant type to determine the OAuth mechanism.
* **Auth URL:** Endpoint for the authorization server used to retrieve the authorization code.
* **Token URL:** Endpoint for the resource server, which exchanges the authorization code for an access token.
* **Client ID:** Client identifier obtained during the Application registration process.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import io.cdap.plugin.http.source.common.error.HttpErrorHandlerEntity;
import io.cdap.plugin.http.source.common.error.RetryableErrorHandling;
import io.cdap.plugin.http.source.common.http.AuthType;
import io.cdap.plugin.http.source.common.http.HttpClient;
import io.cdap.plugin.http.source.common.http.GrantType;
import io.cdap.plugin.http.source.common.http.KeyStoreType;
import io.cdap.plugin.http.source.common.http.OAuthUtil;
import io.cdap.plugin.http.source.common.pagination.PaginationIteratorFactory;
Expand Down Expand Up @@ -118,6 +118,13 @@ public abstract class BaseHttpSourceConfig extends ReferencePluginConfig {

public static final String PAGINATION_INDEX_PLACEHOLDER_REGEX = "\\{pagination.index\\}";
public static final String PAGINATION_INDEX_PLACEHOLDER = "{pagination.index}";
public static final String PROPERTY_GRANT_TYPE = "grantType";
public static final String PROPERTY_GRANT_TYPE_LABEL = "Grant type";
public static final String PARAMETER_CLIENT_ID = "client_id";
public static final String PARAMETER_CLIENT_SECRET = "client_secret";
public static final String PARAMETER_REFRESH_TOKEN = "refresh_token";
public static final String PARAMETER_GRANT_TYPE = "grant_type";
public static final String PARAMETER_ACCESS_TOKEN = "access_token";

@Name(PROPERTY_URL)
@Description("Url to fetch to the first page. The url must start with a protocol (e.g. http://).")
Expand Down Expand Up @@ -442,6 +449,11 @@ public abstract class BaseHttpSourceConfig extends ReferencePluginConfig {
@Description("Output schema. Is required to be set.")
protected String schema;

@Nullable
@Name(PROPERTY_GRANT_TYPE)
@Description("Value of grant type to determine the OAuth mechanism")
protected String grantType;

protected BaseHttpSourceConfig(String referenceName) {
super(referenceName);
}
Expand Down Expand Up @@ -719,6 +731,10 @@ public Schema getSchema() {
schema, e, PROPERTY_SCHEMA);
}
}
@Nullable
public String getGrantType() {
return grantType;
}

@Nullable
public Map<String, String> getHeadersMap() {
Expand Down Expand Up @@ -798,7 +814,6 @@ public void validate(FailureCollector failureCollector) {
String.format("URL value is not valid: '%s'", getUrl()), e, PROPERTY_URL);
}
}

// Validate Linear Retry Interval
if (!containsMacro(PROPERTY_RETRY_POLICY) && getRetryPolicy() == RetryPolicy.LINEAR) {
assertIsSet(getLinearRetryInterval(), PROPERTY_LINEAR_RETRY_INTERVAL, "retry policy is linear");
Expand Down Expand Up @@ -883,20 +898,22 @@ PAGINATION_INDEX_PLACEHOLDER, getPaginationType()),
// Validate OAuth2 properties
if (!containsMacro(PROPERTY_OAUTH2_ENABLED) && this.getOauth2Enabled()) {
String reasonOauth2 = "OAuth2 is enabled";
assertIsSet(getAuthUrl(), PROPERTY_AUTH_URL, reasonOauth2);
assertIsSet(getTokenUrl(), PROPERTY_TOKEN_URL, reasonOauth2);
assertIsSet(getClientId(), PROPERTY_CLIENT_ID, reasonOauth2);
assertIsSet(getClientSecret(), PROPERTY_CLIENT_SECRET, reasonOauth2);
assertIsSet(getRefreshToken(), PROPERTY_REFRESH_TOKEN, reasonOauth2);
assertIsSet(getGrantType(), PROPERTY_GRANT_TYPE, reasonOauth2);

// refresh token validate
if (getGrantType() == GrantType.REFRESH_TOKEN.getValue()) {
assertIsSet(getAuthUrl(), PROPERTY_AUTH_URL, reasonOauth2);
assertIsSet(getRefreshToken(), PROPERTY_REFRESH_TOKEN, reasonOauth2);
}
}
// Validate Authentication properties
AuthType authType = getAuthType();
switch (authType) {
case OAUTH2:
String reasonOauth2 = "OAuth2 is enabled";
if (!containsMacro(PROPERTY_AUTH_URL)) {
assertIsSet(getAuthUrl(), PROPERTY_AUTH_URL, reasonOauth2);
}
if (!containsMacro(PROPERTY_TOKEN_URL)) {
assertIsSet(getTokenUrl(), PROPERTY_TOKEN_URL, reasonOauth2);
}
Expand All @@ -906,8 +923,16 @@ PAGINATION_INDEX_PLACEHOLDER, getPaginationType()),
if (!containsMacro((PROPERTY_CLIENT_SECRET))) {
assertIsSet(getClientSecret(), PROPERTY_CLIENT_SECRET, reasonOauth2);
}
if (!containsMacro(PROPERTY_REFRESH_TOKEN)) {
assertIsSet(getRefreshToken(), PROPERTY_REFRESH_TOKEN, reasonOauth2);
if (!containsMacro(PROPERTY_GRANT_TYPE)) {
assertIsSet(getGrantType(), PROPERTY_GRANT_TYPE, reasonOauth2);
if (getGrantType() == GrantType.REFRESH_TOKEN.getValue()) {
if (!containsMacro(PROPERTY_REFRESH_TOKEN)) {
assertIsSet(getRefreshToken(), PROPERTY_REFRESH_TOKEN, reasonOauth2);
}
if (!containsMacro(PROPERTY_AUTH_URL)) {
assertIsSet(getAuthUrl(), PROPERTY_AUTH_URL, reasonOauth2);
}
}
}
break;
case SERVICE_ACCOUNT:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.http.source.common.http;

import io.cdap.plugin.http.source.common.BaseHttpSourceConfig;
import io.cdap.plugin.http.source.common.exceptions.InvalidPropertyTypeException;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/**
* Enum for different grant type methods.
*/
public enum GrantType {
REFRESH_TOKEN("refresh_token"),
CLIENT_CREDENTIALS("client_credentials");

private final String value;

GrantType(String value) {
this.value = value;
}

public String getValue() {
return value;
}

/**
* Returns the GrantType.
*
* @param value the value is string type.
* @return The GrantType
*/
public static GrantType fromValue(String value) {
return Arrays.stream(GrantType.values()).filter(grantType -> grantType.getValue().equals(value))
.findAny().orElseThrow(() -> new InvalidPropertyTypeException(BaseHttpSourceConfig.PROPERTY_GRANT_TYPE_LABEL,
value, getAllowedValues()));
}

public static List<String> getAllowedValues() {
return Arrays.stream(GrantType.values()).map(v -> v.getValue())
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,15 @@ private CloseableHttpClient createHttpClient() throws IOException {

switch (authType) {
case OAUTH2:
String accessToken = OAuthUtil.getAccessTokenByRefreshToken(HttpClients.createDefault(), config.getTokenUrl(),
config.getClientId(), config.getClientSecret(),
config.getRefreshToken());
String accessToken = null;
if (config.getGrantType() == GrantType.REFRESH_TOKEN.getValue()) {
accessToken = OAuthUtil.getAccessTokenByRefreshToken(HttpClients.createDefault(), config.getTokenUrl(),
config.getClientId(), config.getClientSecret(),
config.getRefreshToken(), config.getGrantType());
} else if (config.getGrantType() == GrantType.CLIENT_CREDENTIALS.getValue()) {
accessToken = OAuthUtil.getAccessTokenByClientCredentials(HttpClients.createDefault(), config.getTokenUrl(),
config.getClientId(), config.getClientSecret(), config.getGrantType());
}
clientHeaders.add(new BasicHeader("Authorization", "Bearer " + accessToken));
break;
case SERVICE_ACCOUNT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;

import java.io.ByteArrayInputStream;
Expand All @@ -34,22 +35,23 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;

/**
* A class which contains utilities to make OAuth2 specific calls.
*/
public class OAuthUtil {
public static String getAccessTokenByRefreshToken(CloseableHttpClient httpclient, String tokenUrl, String clientId,
String clientSecret, String refreshToken)
String clientSecret, String refreshToken, String grantType)
throws IOException {

URI uri;
try {
uri = new URIBuilder(tokenUrl)
.setParameter("client_id", clientId)
.setParameter("client_secret", clientSecret)
.setParameter("refresh_token", refreshToken)
.setParameter("grant_type", "refresh_token")
.setParameter(BaseHttpSourceConfig.PARAMETER_CLIENT_ID, clientId)
.setParameter(BaseHttpSourceConfig.PARAMETER_CLIENT_SECRET, clientSecret)
.setParameter(BaseHttpSourceConfig.PARAMETER_REFRESH_TOKEN, refreshToken)
.setParameter(BaseHttpSourceConfig.PARAMETER_GRANT_TYPE, grantType)
.build();
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Failed to build token URI for OAuth2", e);
Expand All @@ -59,7 +61,7 @@ public static String getAccessTokenByRefreshToken(CloseableHttpClient httpclient
CloseableHttpResponse response = httpclient.execute(httppost);
String responseString = EntityUtils.toString(response.getEntity(), "UTF-8");

JsonElement jsonElement = JSONUtil.toJsonObject(responseString).get("access_token");
JsonElement jsonElement = JSONUtil.toJsonObject(responseString).get(BaseHttpSourceConfig.PARAMETER_ACCESS_TOKEN);
if (jsonElement == null) {
throw new IllegalArgumentException("Access token not found");
}
Expand Down Expand Up @@ -95,5 +97,29 @@ public static String getAccessTokenByServiceAccount(BaseHttpSourceConfig config)
}
return accessToken;
}

public static String getAccessTokenByClientCredentials(CloseableHttpClient httpclient, String tokenUrl,
String clientId, String clientSecret, String grantType)
throws IOException {
URI uri;
try {
uri = new URIBuilder(tokenUrl).setParameter(BaseHttpSourceConfig.PARAMETER_GRANT_TYPE, grantType).build();
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Failed to build token URI for OAuth2", e);
}

HttpPost httppost = new HttpPost(uri);
httppost.addHeader(new BasicHeader("Authorization", "Basic " + getBase64EncodeValue(clientId, clientSecret)));
httppost.addHeader(new BasicHeader("Content-Type", "application/json"));
CloseableHttpResponse response = httpclient.execute(httppost);
String responseString = EntityUtils.toString(response.getEntity(), "UTF-8");

JsonElement jsonElement = JSONUtil.toJsonObject(responseString).get(BaseHttpSourceConfig.PARAMETER_ACCESS_TOKEN);
return jsonElement.getAsString();
}

private static String getBase64EncodeValue(String clientId, String clientSecret) {
return Base64.getEncoder().encodeToString((clientId + ":" + clientSecret).getBytes(StandardCharsets.UTF_8));
}
}

34 changes: 29 additions & 5 deletions widgets/HTTP-batchsource.json
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@
]
}
},
{
"widget-type": "select",
"label": "Grant Type",
"name": "grantType",
"widget-attributes" : {
"values": [
"refresh_token",
"client_credentials"
],
"default" : "refresh_token"
}
},
{
"widget-type": "textbox",
"label": "Auth URL",
Expand Down Expand Up @@ -683,10 +695,6 @@
"value": "oAuth2"
},
"show": [
{
"name": "authUrl",
"type": "property"
},
{
"name": "tokenUrl",
"type": "property"
Expand All @@ -704,7 +712,7 @@
"type": "property"
},
{
"name": "refreshToken",
"name": "grantType",
"type": "property"
}
]
Expand Down Expand Up @@ -780,6 +788,22 @@
"type": "property"
}
]
},
{
"name": "Grant Type Refresh Token",
"condition": {
"expression": "grantType == 'refresh_token' && authType == 'oAuth2'"
},
"show": [
{
"name": "authUrl",
"type": "property"
},
{
"name": "refreshToken",
"type": "property"
}
]
}
]
}