From 5e510d2cb6e562c1bbea7b22dbe7979184a3e863 Mon Sep 17 00:00:00 2001 From: Rodney Peck Date: Mon, 10 May 2021 22:27:17 -0400 Subject: [PATCH] add HTTP plugin support for service account authentication --- pom.xml | 16 +- .../source/common/BaseHttpSourceConfig.java | 39 ++++ .../http/source/common/http/HttpClient.java | 15 +- .../http/source/common/http/OAuthUtil.java | 221 +++++++++++++++++- .../plugin/http/etl/HttpSourceETLTest.java | 2 + .../pagination/PaginationIteratorTest.java | 1 + widgets/HTTP-batchsource.json | 31 +++ 7 files changed, 315 insertions(+), 10 deletions(-) diff --git a/pom.xml b/pom.xml index d49ef2e..15731dd 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ HTTP Plugins io.cdap http-plugins - 1.4.0-SNAPSHOT + 1.4.1-service-account @@ -82,7 +82,7 @@ 2.8.5 2.3.0 4.5.9 - 2.4.0-SNAPSHOT + 2.3.0-SNAPSHOT 2.9.9 4.11 2.7.1 @@ -93,6 +93,16 @@ + + com.google.guava + guava + 13.0.1 + + + com.google.auth + google-auth-library-oauth2-http + 0.25.5 + io.cdap.cdap cdap-api @@ -354,8 +364,6 @@ jython-standalone ${jython.version} - - io.cdap.cdap diff --git a/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java b/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java index a554dd6..942a43f 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java +++ b/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java @@ -87,6 +87,9 @@ public abstract class BaseHttpSourceConfig extends ReferencePluginConfig { public static final String PROPERTY_CLIENT_SECRET = "clientSecret"; public static final String PROPERTY_SCOPES = "scopes"; public static final String PROPERTY_REFRESH_TOKEN = "refreshToken"; + public static final String PROPERTY_SERVICE_ACCOUNT_ENABLED = "serviceAccountEnabled"; + public static final String PROPERTY_SERVICE_ACCOUNT_JSON = "serviceAccountJson"; + public static final String PROPERTY_SERVICE_ACCOUNT_SCOPE = "serviceAccountScope"; public static final String PROPERTY_VERIFY_HTTPS = "verifyHttps"; public static final String PROPERTY_KEYSTORE_FILE = "keystoreFile"; public static final String PROPERTY_KEYSTORE_TYPE = "keystoreType"; @@ -316,6 +319,23 @@ public abstract class BaseHttpSourceConfig extends ReferencePluginConfig { @Macro protected String refreshToken; + @Name(PROPERTY_SERVICE_ACCOUNT_ENABLED) + @Description("If true, plugin will use service account key to perform oauth2 authentication.") + protected String serviceAccountEnabled; + + @Nullable + @Name(PROPERTY_SERVICE_ACCOUNT_JSON) + @Description("Json key file content for OAuth2 with service account.") + @Macro + protected String serviceAccountJson; + + @Nullable + @Name(PROPERTY_SERVICE_ACCOUNT_SCOPE) + @Description("Scope used when using a service account json key file. " + + "Defaults to https://www.googleapis.com/auth/cloud-platform") + @Macro + protected String serviceAccountScope; + @Name(PROPERTY_VERIFY_HTTPS) @Description("If false, untrusted trust certificates (e.g. self signed), will not lead to an" + "error. Do not disable this in production environment on a network you do not entirely trust. " + @@ -533,6 +553,10 @@ public Boolean getOauth2Enabled() { return Boolean.parseBoolean(oauth2Enabled); } + public Boolean getServiceAccountEnabled() { + return Boolean.parseBoolean(serviceAccountEnabled); + } + @Nullable public String getAuthUrl() { return authUrl; @@ -563,6 +587,16 @@ public String getRefreshToken() { return refreshToken; } + @Nullable + public String getServiceAccountJson() { + return serviceAccountJson; + } + + @Nullable + public String getServiceAccountScope() { + return serviceAccountScope; + } + public Boolean getVerifyHttps() { return Boolean.parseBoolean(verifyHttps); } @@ -794,6 +828,11 @@ PAGINATION_INDEX_PLACEHOLDER, getPaginationType()), assertIsSet(getRefreshToken(), PROPERTY_REFRESH_TOKEN, reasonOauth2); } + if (!containsMacro(PROPERTY_SERVICE_ACCOUNT_ENABLED) && this.getServiceAccountEnabled()) { + String reasonOauth2 = "Service Account is enabled"; + assertIsSet(getServiceAccountJson(), PROPERTY_SERVICE_ACCOUNT_JSON, reasonOauth2); + } + if (!containsMacro(PROPERTY_VERIFY_HTTPS) && !getVerifyHttps()) { assertIsNotSet(getTrustStoreFile(), PROPERTY_TRUSTSTORE_FILE, String.format("trustore settings are ignored due to disabled %s", PROPERTY_VERIFY_HTTPS)); diff --git a/src/main/java/io/cdap/plugin/http/source/common/http/HttpClient.java b/src/main/java/io/cdap/plugin/http/source/common/http/HttpClient.java index bb41293..e4c7105 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/http/HttpClient.java +++ b/src/main/java/io/cdap/plugin/http/source/common/http/HttpClient.java @@ -126,15 +126,20 @@ private CloseableHttpClient createHttpClient() throws IOException { ArrayList
clientHeaders = new ArrayList<>(); + String accessToken = null; // oAuth2 + if (config.getServiceAccountEnabled()) { + accessToken = OAuthUtil.getAccessTokenByServiceAccount(HttpClients.createDefault(), + config.getServiceAccountJson(), + config.getServiceAccountScope()); + } if (config.getOauth2Enabled()) { - String accessToken = OAuthUtil.getAccessTokenByRefreshToken(HttpClients.createDefault(), config.getTokenUrl(), - config.getClientId(), config.getClientSecret(), - config.getRefreshToken()); - clientHeaders.add(new BasicHeader("Authorization", "Bearer " + accessToken)); + accessToken = OAuthUtil.getAccessTokenByRefreshToken(HttpClients.createDefault(), config.getTokenUrl(), + config.getClientId(), config.getClientSecret(), + config.getRefreshToken()); } - + clientHeaders.add(new BasicHeader("Authorization", "Bearer " + accessToken)); // set default headers if (headers != null) { for (Map.Entry headerEntry : this.headers.entrySet()) { diff --git a/src/main/java/io/cdap/plugin/http/source/common/http/OAuthUtil.java b/src/main/java/io/cdap/plugin/http/source/common/http/OAuthUtil.java index 27ac280..46bc11c 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/http/OAuthUtil.java +++ b/src/main/java/io/cdap/plugin/http/source/common/http/OAuthUtil.java @@ -15,22 +15,176 @@ */ package io.cdap.plugin.http.source.common.http; +import com.google.api.client.http.GenericUrl; +import com.google.api.client.http.HttpRequest; +import com.google.api.client.http.HttpRequestFactory; +import com.google.api.client.http.HttpResponse; +import com.google.api.client.http.UrlEncodedContent; +import com.google.api.client.http.javanet.NetHttpTransport; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.json.JsonObjectParser; +import com.google.api.client.json.gson.GsonFactory; +import com.google.api.client.json.webtoken.JsonWebSignature; +import com.google.api.client.json.webtoken.JsonWebToken; +import com.google.api.client.util.GenericData; +import com.google.api.client.util.SecurityUtils; + +import com.google.auth.http.HttpTransportFactory; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + import io.cdap.plugin.http.source.common.pagination.page.JSONUtil; + import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.utils.URIBuilder; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.util.EntityUtils; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.StringReader; +import java.io.StringWriter; import java.net.URI; import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; + +import java.security.GeneralSecurityException; +import java.security.KeyFactory; +import java.security.PrivateKey; +import java.security.Signature; +import java.security.spec.PKCS8EncodedKeySpec; + +import java.util.Arrays; +import java.util.Base64; +import java.util.Date; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * A class which contains utilities to make OAuth2 specific calls. */ public class OAuthUtil { + + public static PrivateKey readPKCS8Pem(String key) throws Exception { + key = key.replace("-----BEGIN PRIVATE KEY-----", ""); + key = key.replace("-----END PRIVATE KEY-----", ""); + key = key.replaceAll("\\s+", ""); + + // Base64 decode the result + byte [] pkcs8EncodedBytes = decode(key); + + // extract the private key + PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(pkcs8EncodedBytes); + KeyFactory kf = KeyFactory.getInstance("RSA"); + return kf.generatePrivate(keySpec); + } + + public static String encodeBase64URLSafeString(byte[] binaryData) { + if (binaryData == null) { + return null; + } + return Base64.getUrlEncoder().withoutPadding().encodeToString(binaryData); + } + + public static String signUsingRsaSha256( + PrivateKey privateKey, + JsonFactory jsonFactory, + JsonWebSignature.Header header, + JsonWebToken.Payload payload) + throws GeneralSecurityException, IOException { + String head = encodeBase64URLSafeString(jsonFactory.toByteArray(header)); + String content = head + "." + + encodeBase64URLSafeString(jsonFactory.toByteArray(payload)); + byte[] contentBytes = content.getBytes(); + Signature sig = Signature.getInstance("SHA256withRSA"); + sig.initSign(privateKey); + sig.update(contentBytes); + byte[] signature = sig.sign(); + return content + "." + encodeBase64URLSafeString(signature); + } + + + +// copied from https://stackoverflow.com/a/4265472 + private static char[] theALPHABET = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/".toCharArray(); + + private static int[] toInt = new int[128]; + + static { + for (int i = 0; i < theALPHABET.length; i++) { + toInt[theALPHABET[i]] = i; + } + } + + /** + * Translates the specified byte array into Base64 string. + * + * @param buf the byte array (not null) + * @return the translated Base64 string (not null) + */ + public static String encode(byte[] buf) { + int size = buf.length; + char[] ar = new char[((size + 2) / 3) * 4]; + int a = 0; + int i = 0; + while (i < size) { + byte b0 = buf[i++]; + byte b1 = (i < size) ? buf[i++] : 0; + byte b2 = (i < size) ? buf[i++] : 0; + + int mask = 0x3F; + ar[a++] = theALPHABET[(b0 >> 2) & mask]; + ar[a++] = theALPHABET[((b0 << 4) | ((b1 & 0xFF) >> 4)) & mask]; + ar[a++] = theALPHABET[((b1 << 2) | ((b2 & 0xFF) >> 6)) & mask]; + ar[a++] = theALPHABET[b2 & mask]; + } + switch (size % 3) { + case 1: ar[--a] = '='; + break; + case 2: ar[--a] = '='; + ar[--a] = '='; + break; + } + return new String(ar); + } + + /** + * Translates the specified Base64 string into a byte array. + * + * @param s the Base64 string (not null) + * @return the byte array (not null) + */ + public static byte[] decode(String s) { + int delta = s.endsWith("==") ? 2 : s.endsWith("=") ? 1 : 0; + byte[] buffer = new byte[s.length() * 3 / 4 - delta]; + int mask = 0xFF; + int index = 0; + for (int i = 0; i < s.length(); i += 4) { + int c0 = toInt[s.charAt(i)]; + int c1 = toInt[s.charAt(i + 1)]; + buffer[index++] = (byte) (((c0 << 2) | (c1 >> 4)) & mask); + if (index >= buffer.length) { + return buffer; + } + int c2 = toInt[s.charAt(i + 2)]; + buffer[index++] = (byte) (((c1 << 4) | (c2 >> 2)) & mask); + if (index >= buffer.length) { + return buffer; + } + int c3 = toInt[s.charAt(i + 3)]; + buffer[index++] = (byte) (((c2 << 6) | c3) & mask); + } + return buffer; + } + + public static String getAccessTokenByRefreshToken(CloseableHttpClient httpclient, String tokenUrl, String clientId, String clientSecret, String refreshToken) throws IOException { @@ -54,5 +208,70 @@ public static String getAccessTokenByRefreshToken(CloseableHttpClient httpclient JsonElement jsonElement = JSONUtil.toJsonObject(responseString).get("access_token"); return jsonElement.getAsString(); } -} + public static String getAccessTokenByServiceAccount(CloseableHttpClient httpclient, String serviceAccountJson, + String serviceAccountScope) + throws IOException { + HttpTransportFactory transportFactory; + JsonObject sa = JSONUtil.toJsonObject(serviceAccountJson); + String tokenServerUri = sa.get("token_uri").getAsString(); + String scope = serviceAccountScope; + + if (serviceAccountScope == null) { + scope = "https://www.googleapis.com/auth/cloud-platform"; + } + + PrivateKey key; + try { + key = readPKCS8Pem(sa.get("private_key").getAsString()); + } catch (Exception e) { + throw new IOException( + "Error decoding service account private key.", e); + } + long currentTime = System.currentTimeMillis(); + JsonWebSignature.Header header = new JsonWebSignature.Header(); + header.setAlgorithm("RS256"); + header.setType("JWT"); + header.setKeyId(sa.get("private_key_id").getAsString()); + + + JsonWebToken.Payload payload = new JsonWebToken.Payload(); + payload.setIssuer(sa.get("client_email").getAsString()); + payload.setIssuedAtTimeSeconds(currentTime / 1000); + payload.setExpirationTimeSeconds(currentTime / 1000 + 3600); // one hour + payload.setSubject(sa.get("client_email").getAsString()); + payload.put("scope", scope); + payload.setAudience(tokenServerUri); + + String assertion; + try { + assertion = signUsingRsaSha256(key, GsonFactory.getDefaultInstance(), header, payload); + } catch (GeneralSecurityException e) { + throw new IOException( + "Error signing service account access token request with private key.", e); + } + + GenericData tokenRequest = new GenericData(); + tokenRequest.set("grant_type", "urn:ietf:params:oauth:grant-type:jwt-bearer"); + tokenRequest.set("assertion", assertion); + + UrlEncodedContent content = new UrlEncodedContent(tokenRequest); + HttpRequestFactory requestFactory = new NetHttpTransport().createRequestFactory(); + HttpRequest request = requestFactory.buildPostRequest(new GenericUrl(tokenServerUri), content); + HttpResponse response = request.execute(); + + InputStream in = response.getContent(); + StringWriter out = new StringWriter(); + byte[] buf = new byte[4096]; + int r; + while (true) { + r = in.read(buf); + if (r == -1) { + break; + } + out.write(new String(buf).substring(0, r)); + } + JsonObject responseData = JSONUtil.toJsonObject(out.toString()); + return responseData.get("access_token").toString(); + } +} diff --git a/src/test/java/io/cdap/plugin/http/etl/HttpSourceETLTest.java b/src/test/java/io/cdap/plugin/http/etl/HttpSourceETLTest.java index 07a594e..aa0ee7f 100644 --- a/src/test/java/io/cdap/plugin/http/etl/HttpSourceETLTest.java +++ b/src/test/java/io/cdap/plugin/http/etl/HttpSourceETLTest.java @@ -487,6 +487,8 @@ protected Map getProperties(Map sourceProperties .put("referenceName", testName.getMethodName()) .put(BaseHttpSourceConfig.PROPERTY_HTTP_METHOD, "GET") .put(BaseHttpSourceConfig.PROPERTY_OAUTH2_ENABLED, "false") + .put(BaseHttpSourceConfig.PROPERTY_SERVICE_ACCOUNT_ENABLED, "false") + .put(BaseHttpSourceConfig.PROPERTY_SERVICE_ACCOUNT_SCOPE, "") .put(BaseHttpSourceConfig.PROPERTY_HTTP_ERROR_HANDLING, "2..:Success,.*:Fail") .put(BaseHttpSourceConfig.PROPERTY_ERROR_HANDLING, "stopOnError") .put(BaseHttpSourceConfig.PROPERTY_RETRY_POLICY, "linear") diff --git a/src/test/java/io/cdap/plugin/http/source/common/pagination/PaginationIteratorTest.java b/src/test/java/io/cdap/plugin/http/source/common/pagination/PaginationIteratorTest.java index 16d22b5..84f1f30 100644 --- a/src/test/java/io/cdap/plugin/http/source/common/pagination/PaginationIteratorTest.java +++ b/src/test/java/io/cdap/plugin/http/source/common/pagination/PaginationIteratorTest.java @@ -333,6 +333,7 @@ static class BaseTestConfig extends HttpBatchSourceConfig { this.url = ""; this.httpMethod = "GET"; this.oauth2Enabled = "false"; + this.serviceAccountEnabled = "false"; this.httpErrorsHandling = "2..:Success,.*:Fail"; this.retryPolicy = "linear"; this.maxRetryDuration = 10L; diff --git a/widgets/HTTP-batchsource.json b/widgets/HTTP-batchsource.json index 6c05378..a172416 100644 --- a/widgets/HTTP-batchsource.json +++ b/widgets/HTTP-batchsource.json @@ -153,6 +153,37 @@ } ] }, + { + "label": "OAuth2 Service Account", + "properties": [ + { + "widget-type": "toggle", + "label": "OAuth2 Service Account Enabled", + "name": "serviceAccountEnabled", + "widget-attributes": { + "default": "false", + "on": { + "label": "True", + "value": "true" + }, + "off": { + "label": "False", + "value": "false" + } + } + }, + { + "widget-type": "textbox", + "label": "Service Account key JSON", + "name": "serviceAccountJson" + }, + { + "widget-type": "textbox", + "label": "Scopes", + "name": "serviceAccountScope" + } + ] + }, { "label": "Basic Authentication", "properties": [