diff --git a/src/main/java/io/cdap/plugin/http/common/HttpErrorDetailsProvider.java b/src/main/java/io/cdap/plugin/http/common/HttpErrorDetailsProvider.java new file mode 100644 index 00000000..f16dccf0 --- /dev/null +++ b/src/main/java/io/cdap/plugin/http/common/HttpErrorDetailsProvider.java @@ -0,0 +1,120 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.http.common; + +import com.google.common.base.Throwables; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.cdap.api.exception.ProgramFailureException; +import io.cdap.cdap.etl.api.exception.ErrorContext; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider; +import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException; + +import java.util.List; + +import java.util.NoSuchElementException; + +/** + * Error details provided for the HTTP + **/ +public class HttpErrorDetailsProvider implements ErrorDetailsProvider { + @Override + public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) { + List causalChain = Throwables.getCausalChain(e); + for (Throwable t : causalChain) { + if (t instanceof ProgramFailureException) { + // if causal chain already has program failure exception, return null to avoid double wrap. + return null; + } + if (t instanceof IllegalArgumentException) { + return getProgramFailureException((IllegalArgumentException) t, errorContext); + } + if (t instanceof IllegalStateException) { + return getProgramFailureException((IllegalStateException) t, errorContext); + } + if (t instanceof InvalidConfigPropertyException) { + return getProgramFailureException((InvalidConfigPropertyException) t, errorContext); + } + if (t instanceof NoSuchElementException) { + return getProgramFailureException((NoSuchElementException) t, errorContext); + } + } + return null; + } + + /** + * Get a ProgramFailureException with the given error + * information from {@link IllegalArgumentException}. + * + * @param e The IllegalArgumentException to get the error information from. + * @return A ProgramFailureException with the given error information. + */ + private ProgramFailureException getProgramFailureException(IllegalArgumentException e, ErrorContext errorContext) { + String errorMessage = e.getMessage(); + String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; + return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, + String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.USER, false, e); + } + + /** + * Get a ProgramFailureException with the given error + * information from {@link IllegalStateException}. + * + * @param e The IllegalStateException to get the error information from. + * @return A ProgramFailureException with the given error information. + */ + private ProgramFailureException getProgramFailureException(IllegalStateException e, ErrorContext errorContext) { + String errorMessage = e.getMessage(); + String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; + return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, + String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.SYSTEM, false, e); + } + + /** + * Get a ProgramFailureException with the given error + * information from {@link InvalidConfigPropertyException}. + * + * @param e The InvalidConfigPropertyException to get the error information from. + * @return A ProgramFailureException with the given error information. + */ + private ProgramFailureException getProgramFailureException(InvalidConfigPropertyException e, + ErrorContext errorContext) { + String errorMessage = e.getMessage(); + String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; + return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, + String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.SYSTEM, false, e); + } + + /** + * Get a ProgramFailureException with the given error + * information from {@link NoSuchElementException}. + * + * @param e The NoSuchElementException to get the error information from. + * @return A ProgramFailureException with the given error information. + */ + private ProgramFailureException getProgramFailureException(NoSuchElementException e, ErrorContext errorContext) { + String errorMessage = e.getMessage(); + String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; + return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, + String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.SYSTEM, false, e); + } +} diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPOutputFormat.java b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPOutputFormat.java index 867d46be..45cd88d4 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPOutputFormat.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPOutputFormat.java @@ -37,12 +37,16 @@ public class HTTPOutputFormat extends OutputFormat getRecordWriter(TaskAttemptContext context) - throws IOException { + public RecordWriter getRecordWriter(TaskAttemptContext context) { Configuration hConf = context.getConfiguration(); HTTPSinkConfig config = GSON.fromJson(hConf.get(CONFIG_KEY), HTTPSinkConfig.class); - Schema inputSchema = Schema.parseJson(hConf.get(INPUT_SCHEMA_KEY)); - return new HTTPRecordWriter(config, inputSchema); + Schema inputSchema; + try { + inputSchema = Schema.parseJson(hConf.get(INPUT_SCHEMA_KEY)); + return new HTTPRecordWriter(config, inputSchema); + } catch (IOException e) { + throw new IllegalStateException("Unable to parse the input schema. Reason: " + e.getMessage(), e); + } } @Override diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPRecordWriter.java b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPRecordWriter.java index 2180737b..848637d1 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPRecordWriter.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPRecordWriter.java @@ -17,10 +17,12 @@ package io.cdap.plugin.http.sink.batch; import com.google.auth.oauth2.AccessToken; -import com.google.common.base.Charsets; import com.google.common.base.Strings; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.plugin.http.common.RetryPolicy; import io.cdap.plugin.http.common.error.ErrorHandling; import io.cdap.plugin.http.common.error.HttpErrorHandler; @@ -55,10 +57,10 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.MalformedURLException; -import java.net.ProtocolException; import java.net.URI; import java.net.URL; import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.cert.X509Certificate; @@ -72,7 +74,6 @@ import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSession; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; @@ -90,9 +91,9 @@ public class HTTPRecordWriter extends RecordWriter placeHolderList; + private final List placeHolderList; private final Map headers; private AccessToken accessToken; @@ -115,14 +116,14 @@ public class HTTPRecordWriter extends RecordWriter duration.multiply(2), - Duration.FIVE_HUNDRED_MILLISECONDS); + Duration.FIVE_HUNDRED_MILLISECONDS); } url = config.getUrl(); placeHolderList = getPlaceholderListFromURL(); } @Override - public void write(StructuredRecord input, StructuredRecord unused) throws IOException { + public void write(StructuredRecord input, StructuredRecord unused) { configURL = url; if (config.getMethod().equals(REQUEST_METHOD_POST) || config.getMethod().equals(REQUEST_METHOD_PUT) || config.getMethod().equals(REQUEST_METHOD_PATCH)) { @@ -130,8 +131,7 @@ public void write(StructuredRecord input, StructuredRecord unused) throws IOExce } if (config.getMethod().equals(REQUEST_METHOD_PUT) || config.getMethod().equals(REQUEST_METHOD_PATCH) || - config.getMethod().equals(REQUEST_METHOD_DELETE) - && !placeHolderList.isEmpty()) { + config.getMethod().equals(REQUEST_METHOD_DELETE) && !placeHolderList.isEmpty()) { configURL = updateURLWithPlaceholderValue(input); } @@ -151,7 +151,7 @@ public void close(TaskAttemptContext taskAttemptContext) throws IOException, Int private void disableSSLValidation() { TrustManager[] trustAllCerts = new TrustManager[]{new X509TrustManager() { public java.security.cert.X509Certificate[] getAcceptedIssuers() { - return null; + return new X509Certificate[0]; } public void checkClientTrusted(X509Certificate[] certs, String authType) { @@ -169,76 +169,92 @@ public void checkServerTrusted(X509Certificate[] certs, String authType) { throw new IllegalStateException("Error while installing the trust manager: " + e.getMessage(), e); } HttpsURLConnection.setDefaultSSLSocketFactory(sslContext.getSocketFactory()); - HostnameVerifier allHostsValid = new HostnameVerifier() { - public boolean verify(String hostname, SSLSession session) { - return true; - } - }; + HostnameVerifier allHostsValid = (hostname, session) -> true; HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid); } - private boolean executeHTTPServiceAndCheckStatusCode() throws IOException { + private boolean executeHTTPServiceAndCheckStatusCode() { LOG.debug("HTTP Request Attempt No. : {}", ++retryCount); - CloseableHttpClient httpClient = createHttpClient(configURL); - CloseableHttpResponse response = null; + // Try-with-resources ensures proper resource management + try (CloseableHttpClient httpClient = createHttpClient(configURL); + CloseableHttpResponse response = executeHttpRequest(httpClient, new URL(configURL))) { + httpStatusCode = response.getStatusLine().getStatusCode(); + LOG.debug("Response HTTP Status code: {}", httpStatusCode); + httpResponseBody = new HttpResponse(response).getBody(); + + RetryableErrorHandling errorHandlingStrategy = httpErrorHandler.getErrorHandlingStrategy(httpStatusCode); + boolean shouldRetry = errorHandlingStrategy.shouldRetry(); + + if (!shouldRetry) { + messageBuffer.clear(); + retryCount = 0; + } + return !shouldRetry; + + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Invalid URL: " + configURL, e); + } catch (IOException e) { + LOG.warn("Error making {} request to URL {}.", config.getMethod(), config.getUrl()); + String errorMessage = "Unable to make request. "; + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, e.getMessage(), ErrorType.UNKNOWN, true, e); + } + } + + private CloseableHttpResponse executeHttpRequest(CloseableHttpClient httpClient, URL url) { try { - URL url = new URL(configURL); - HttpEntityEnclosingRequestBase request = new HttpRequest(URI.create(String.valueOf(url)), - config.getMethod()); - - if (url.getProtocol().equalsIgnoreCase("https")) { - // Disable SSLv3 - System.setProperty("https.protocols", "TLSv1,TLSv1.1,TLSv1.2"); - if (config.getDisableSSLValidation()) { - disableSSLValidation(); - } + HttpEntityEnclosingRequestBase request = new HttpRequest(URI.create(url.toString()), config.getMethod()); + + if ("https".equalsIgnoreCase(url.getProtocol())) { + configureHttpsSettings(); } if (!messageBuffer.isEmpty()) { String requestBodyString = messageBuffer.getMessage(); if (requestBodyString != null) { - StringEntity requestBody = new StringEntity(requestBodyString, Charsets.UTF_8.toString()); + StringEntity requestBody = new StringEntity(requestBodyString, StandardCharsets.UTF_8.name()); request.setEntity(requestBody); } } request.setHeaders(getRequestHeaders()); - response = httpClient.execute(request); - httpStatusCode = response.getStatusLine().getStatusCode(); - LOG.debug("Response HTTP Status code: {}", httpStatusCode); - httpResponseBody = new HttpResponse(response).getBody(); + // Execute the request and return the response + return httpClient.execute(request); - } catch (MalformedURLException | ProtocolException e) { - throw new IllegalStateException("Error opening url connection. Reason: " + e.getMessage(), e); + } catch (UnsupportedEncodingException e) { + throw new IllegalStateException("Error encoding the request Reason: " + e.getMessage(), e); } catch (IOException e) { - LOG.warn("Error making {} request to url {}.", config.getMethod(), config.getUrl()); - } finally { - if (response != null) { - response.close(); - } - } - RetryableErrorHandling errorHandlingStrategy = httpErrorHandler.getErrorHandlingStrategy(httpStatusCode); - boolean shouldRetry = errorHandlingStrategy.shouldRetry(); - if (!shouldRetry) { - messageBuffer.clear(); - retryCount = 0; + String errorMessage = String.format("Unable to execute HTTP request to %s.", url); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, e.getMessage(), ErrorType.UNKNOWN, true, new IOException(errorMessage)); + } catch (Exception e) { + String errorMessage = String.format("Unexpected error occurred while executing HTTP request to URL: %s", url); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, + errorMessage, ErrorType.UNKNOWN, true, e); } - return !shouldRetry; } + private void configureHttpsSettings() { + System.setProperty("https.protocols", "TLSv1,TLSv1.1,TLSv1.2"); + if (Boolean.TRUE.equals(config.getDisableSSLValidation())) { + disableSSLValidation(); + } + } - public CloseableHttpClient createHttpClient(String pageUriStr) throws IOException { + public CloseableHttpClient createHttpClient(String pageUriStr) { HttpClientBuilder httpClientBuilder = HttpClientBuilder.create(); // set timeouts - Long connectTimeoutMillis = TimeUnit.SECONDS.toMillis(config.getConnectTimeout()); - Long readTimeoutMillis = TimeUnit.SECONDS.toMillis(config.getReadTimeout()); + long connectTimeoutMillis = TimeUnit.SECONDS.toMillis(config.getConnectTimeout()); + long readTimeoutMillis = TimeUnit.SECONDS.toMillis(config.getReadTimeout()); RequestConfig.Builder requestBuilder = RequestConfig.custom(); - requestBuilder.setSocketTimeout(readTimeoutMillis.intValue()); - requestBuilder.setConnectTimeout(connectTimeoutMillis.intValue()); - requestBuilder.setConnectionRequestTimeout(connectTimeoutMillis.intValue()); + requestBuilder.setSocketTimeout((int) readTimeoutMillis); + requestBuilder.setConnectTimeout((int) connectTimeoutMillis); + requestBuilder.setConnectionRequestTimeout((int) connectTimeoutMillis); httpClientBuilder.setDefaultRequestConfig(requestBuilder.build()); // basic auth @@ -274,23 +290,20 @@ private Header[] getRequestHeaders() throws IOException { if (accessToken != null) { Header authorizationHeader = getAuthorizationHeader(accessToken); - if (authorizationHeader != null) { - clientHeaders.add(authorizationHeader); - } + clientHeaders.add(authorizationHeader); } headers.put("Request-Method", config.getMethod().toUpperCase()); headers.put("Instance-Follow-Redirects", String.valueOf(config.getFollowRedirects())); headers.put("charset", config.getCharset()); - if (config.getMethod().equals(REQUEST_METHOD_POST) + if ((config.getMethod().equals(REQUEST_METHOD_POST) || config.getMethod().equals(REQUEST_METHOD_PATCH) - || config.getMethod().equals(REQUEST_METHOD_PUT)) { - if (!headers.containsKey("Content-Type")) { - headers.put("Content-Type", contentType); - } + || config.getMethod().equals(REQUEST_METHOD_PUT)) && !headers.containsKey("Content-Type")) { + headers.put("Content-Type", contentType); } + // set default headers if (headers != null) { for (Map.Entry headerEntry : this.headers.entrySet()) { @@ -356,8 +369,10 @@ private void flushMessageBuffer() { .timeout(config.getMaxRetryDuration(), TimeUnit.SECONDS) .until(this::executeHTTPServiceAndCheckStatusCode); } catch (Exception e) { - throw new RuntimeException("Error while executing http request for remaining input messages " + - "after the batch execution. " + e); + String errorMessage = "Error while executing http request for remaining input messages" + + " after the batch execution."; + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, e.getMessage(), ErrorType.UNKNOWN, true, new RuntimeException(errorMessage)); } messageBuffer.clear(); @@ -369,11 +384,11 @@ private void flushMessageBuffer() { break; case STOP: throw new IllegalStateException(String.format("Fetching from url '%s' returned status code '%d' and body '%s'", - config.getUrl(), httpStatusCode, httpResponseBody)); + config.getUrl(), httpStatusCode, httpResponseBody)); case SKIP: case SEND: LOG.warn(String.format("Fetching from url '%s' returned status code '%d' and body '%s'", - config.getUrl(), httpStatusCode, httpResponseBody)); + config.getUrl(), httpStatusCode, httpResponseBody)); break; default: throw new IllegalArgumentException(String.format("Unexpected http error handling: '%s'", postRetryStrategy)); diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSink.java b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSink.java index f33036f5..a73c472d 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSink.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSink.java @@ -30,12 +30,15 @@ import io.cdap.cdap.etl.api.StageConfigurer; import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSinkContext; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; import io.cdap.plugin.common.Asset; import io.cdap.plugin.common.LineageRecorder; +import io.cdap.plugin.http.common.HttpErrorDetailsProvider; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; /** @@ -45,7 +48,7 @@ @Name("HTTP") @Description("Sink plugin to send the messages from the pipeline to an external http endpoint.") public class HTTPSink extends BatchSink { - private HTTPSinkConfig config; + private final HTTPSinkConfig config; public HTTPSink(HTTPSinkConfig config) { this.config = config; @@ -73,13 +76,22 @@ public void prepareRun(BatchSinkContext context) { .setFqn(config.getUrl()).build(); LineageRecorder lineageRecorder = new LineageRecorder(context, asset); lineageRecorder.createExternalDataset(context.getInputSchema()); - List fields = inputSchema == null ? - Collections.emptyList() : - inputSchema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList()); + List fields; + if (inputSchema == null) { + fields = Collections.emptyList(); + } else { + fields = Objects.requireNonNull(Objects.requireNonNull(inputSchema).getFields()).stream() + .map(Schema.Field::getName).collect(Collectors.toList()); + } lineageRecorder.recordWrite("Write", String.format("Wrote to HTTP '%s'", config.getUrl()), fields); + // set error details provider + context.setErrorDetailsProvider( + new ErrorDetailsProviderSpec(HttpErrorDetailsProvider.class.getName())); + context.addOutput(Output.of(config.getReferenceNameOrNormalizedFQN(), - new HTTPSink.HTTPOutputFormatProvider(config, inputSchema))); + new HTTPSink.HTTPOutputFormatProvider(config, inputSchema))); + } /** diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfig.java b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfig.java index cb3cc9ff..47575aca 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfig.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfig.java @@ -80,7 +80,7 @@ public class HTTPSinkConfig extends BaseHttpConfig { private static final String REGEX_HASHED_VAR = "#(\\w+)"; private static final String PLACEHOLDER = "#"; private static final Set METHODS = ImmutableSet.of(HttpMethod.GET, HttpMethod.POST, - HttpMethod.PUT, HttpMethod.DELETE, "PATCH"); + HttpMethod.PUT, HttpMethod.DELETE, "PATCH"); @Name(URL) @Description("The URL to post data to. Additionally, a placeholder like #columnName can be added to the URL that " + @@ -178,7 +178,7 @@ public class HTTPSinkConfig extends BaseHttpConfig { @Description("Interval in seconds between retries. Is only used if retry policy is \"linear\".") @Macro protected Long linearRetryInterval; - + @Nullable @Name(PROPERTY_MAX_RETRY_DURATION) @Description("Maximum time in seconds retries can take. Default value is 600 seconds (10 minute).") @@ -204,7 +204,7 @@ public HTTPSinkConfig(String referenceName, String url, String method, Integer b @Nullable String requestHeaders, String charset, boolean followRedirects, boolean disableSSLValidation, @Nullable String httpErrorsHandling, String errorHandling, String retryPolicy, @Nullable Long linearRetryInterval, - Long maxRetryDuration, @Nullable int readTimeout, @Nullable int connectTimeout, + Long maxRetryDuration, int readTimeout, int connectTimeout, String oauth2Enabled, String authType, @Nullable String jsonBatchKey, Boolean writeJsonAsArray) { super(referenceName); @@ -393,8 +393,8 @@ public List getHttpErrorHandlingEntries() { String regex = entry.getKey(); try { results.add(new HttpErrorHandlerEntity(Pattern.compile(regex), - getEnumValueByString(RetryableErrorHandling.class, - entry.getValue(), PROPERTY_HTTP_ERROR_HANDLING))); + getEnumValueByString(RetryableErrorHandling.class, + entry.getValue(), PROPERTY_HTTP_ERROR_HANDLING))); } catch (PatternSyntaxException e) { // We embed causing exception message into this one. Since this message is shown on UI when validation fails. throw new InvalidConfigPropertyException( @@ -449,7 +449,7 @@ public void validate(FailureCollector collector) { if (!containsMacro(METHOD) && !METHODS.contains(method.toUpperCase())) { collector.addFailure( - String.format("Invalid request method %s, must be one of %s.", method, Joiner.on(',').join(METHODS)), null) + String.format("Invalid request method %s, must be one of %s.", method, Joiner.on(',').join(METHODS)), null) .withConfigProperty(METHOD); } @@ -492,15 +492,15 @@ public void validateSchema(@Nullable Schema schema, FailureCollector collector) if (containsMacro(URL) || containsMacro(METHOD)) { return; } - + if ((method.equals("PUT") || method.equals("PATCH") || method.equals("DELETE")) && url.contains(PLACEHOLDER)) { Pattern pattern = Pattern.compile(REGEX_HASHED_VAR); Matcher matcher = pattern.matcher(url); - List fieldNames = fields.stream().map(field -> field.getName()).collect(Collectors.toList()); + List fieldNames = fields.stream().map(Schema.Field::getName).collect(Collectors.toList()); while (matcher.find()) { if (!fieldNames.contains(matcher.group(1))) { collector.addFailure(String.format("Schema must contain '%s' field mentioned in the url", matcher.group(1)), - null).withConfigProperty(URL); + null).withConfigProperty(URL); } } } diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/MessageBuffer.java b/src/main/java/io/cdap/plugin/http/sink/batch/MessageBuffer.java index 63725d3f..4c4e5ac6 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/MessageBuffer.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/MessageBuffer.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Objects; import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -62,8 +63,8 @@ public class MessageBuffer { * @param customMessageBody The custom message body to be used. */ public MessageBuffer( - MessageFormatType messageFormat, String jsonBatchKey, boolean shouldWriteJsonAsArray, - String delimiterForMessages, String charset, String customMessageBody, Schema inputSchema + MessageFormatType messageFormat, String jsonBatchKey, boolean shouldWriteJsonAsArray, + String delimiterForMessages, String charset, String customMessageBody, Schema inputSchema ) { this.jsonBatchKey = jsonBatchKey; this.delimiterForMessages = delimiterForMessages; @@ -91,16 +92,16 @@ public MessageBuffer( // field name and the array of records as the value Schema bufferRecordArraySchema = Schema.arrayOf(inputSchema); wrappedMessageSchema = Schema.recordOf("wrapper", - Schema.Field.of(jsonBatchKey, bufferRecordArraySchema)); + Schema.Field.of(jsonBatchKey, bufferRecordArraySchema)); } /** * Adds a record to the buffer. * - * @param record The record to be added. + * @param structuredRecord The record to be added. */ - public void add(StructuredRecord record) { - buffer.add(record); + public void add(StructuredRecord structuredRecord) { + buffer.add(structuredRecord); } /** @@ -148,40 +149,40 @@ private String formatAsJson(List buffer) { private String formatAsJsonInternal(List buffer) throws IOException { boolean useJsonBatchKey = !Strings.isNullOrEmpty(jsonBatchKey); - if (!shouldWriteJsonAsArray || !useJsonBatchKey) { + if (Boolean.TRUE.equals(!shouldWriteJsonAsArray) || !useJsonBatchKey) { return getBufferAsJsonList(); } StructuredRecord wrappedMessageRecord = StructuredRecord.builder(wrappedMessageSchema) - .set(jsonBatchKey, buffer).build(); + .set(jsonBatchKey, buffer).build(); return StructuredRecordStringConverter.toJsonString(wrappedMessageRecord); } private String formatAsForm(List buffer) { return buffer.stream() - .map(this::createFormMessage) - .collect(Collectors.joining(delimiterForMessages)); + .map(this::createFormMessage) + .collect(Collectors.joining(delimiterForMessages)); } private String formatAsCustom(List buffer) { return buffer.stream() - .map(this::createCustomMessage) - .collect(Collectors.joining(delimiterForMessages)); + .map(this::createCustomMessage) + .collect(Collectors.joining(delimiterForMessages)); } private String getBufferAsJsonList() throws IOException { StringBuilder sb = new StringBuilder(); - String delimiter = shouldWriteJsonAsArray ? "," : delimiterForMessages; - if (shouldWriteJsonAsArray) { + String delimiter = Boolean.TRUE.equals(shouldWriteJsonAsArray) ? "," : delimiterForMessages; + if (Boolean.TRUE.equals(shouldWriteJsonAsArray)) { sb.append("["); } - for (StructuredRecord record : buffer) { - sb.append(StructuredRecordStringConverter.toJsonString(record)); + for (StructuredRecord structuredRecord : buffer) { + sb.append(StructuredRecordStringConverter.toJsonString(structuredRecord)); sb.append(delimiter); } if (!buffer.isEmpty()) { sb.setLength(sb.length() - delimiter.length()); } - if (shouldWriteJsonAsArray) { + if (Boolean.TRUE.equals(shouldWriteJsonAsArray)) { sb.append("]"); } return sb.toString(); @@ -190,16 +191,18 @@ private String getBufferAsJsonList() throws IOException { private String createFormMessage(StructuredRecord input) { boolean first = true; String formMessage = null; - StringBuilder sb = new StringBuilder(""); - for (Schema.Field field : input.getSchema().getFields()) { - if (first) { - first = false; - } else { - sb.append("&"); + StringBuilder sb = new StringBuilder(); + if (input != null && input.getSchema() != null) { + for (Schema.Field field : Objects.requireNonNull(input.getSchema().getFields())) { + if (first) { + first = false; + } else { + sb.append("&"); + } + sb.append(field.getName()); + sb.append("="); + sb.append((String) input.get(field.getName())); } - sb.append(field.getName()); - sb.append("="); - sb.append((String) input.get(field.getName())); } try { formMessage = URLEncoder.encode(sb.toString(), charset); @@ -212,13 +215,13 @@ private String createFormMessage(StructuredRecord input) { private String createCustomMessage(StructuredRecord input) { String customMessage = customMessageBody; Matcher matcher = Pattern.compile(REGEX_HASHED_VAR).matcher(customMessage); - HashMap findReplaceMap = new HashMap(); + HashMap findReplaceMap = new HashMap<>(); while (matcher.find()) { if (input.get(matcher.group(1)) != null) { findReplaceMap.put(matcher.group(1), (String) input.get(matcher.group(1))); } else { throw new IllegalArgumentException(String.format( - "Field %s doesnt exist in the input schema.", matcher.group(1))); + "Field %s doesnt exist in the input schema.", matcher.group(1))); } } Matcher replaceMatcher = Pattern.compile(REGEX_HASHED_VAR).matcher(customMessage); diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/PlaceholderBean.java b/src/main/java/io/cdap/plugin/http/sink/batch/PlaceholderBean.java index 1520f8e0..d219df1c 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/PlaceholderBean.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/PlaceholderBean.java @@ -22,13 +22,12 @@ public class PlaceholderBean { private static final String PLACEHOLDER_FORMAT = "#%s"; private final String placeHolderKey; - private final String placeHolderKeyWithPrefix; private final int startIndex; private final int endIndex; public PlaceholderBean(String url, String placeHolderKey) { + String placeHolderKeyWithPrefix = String.format(PLACEHOLDER_FORMAT, placeHolderKey); this.placeHolderKey = placeHolderKey; - this.placeHolderKeyWithPrefix = String.format(PLACEHOLDER_FORMAT, placeHolderKey); this.startIndex = url.indexOf(placeHolderKeyWithPrefix); this.endIndex = startIndex + placeHolderKeyWithPrefix.length(); } diff --git a/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSource.java b/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSource.java index 9fff34d5..108027e3 100644 --- a/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSource.java +++ b/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSource.java @@ -31,14 +31,19 @@ import io.cdap.cdap.etl.api.batch.BatchRuntimeContext; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; import io.cdap.plugin.common.Asset; import io.cdap.plugin.common.LineageRecorder; -import io.cdap.plugin.http.common.pagination.page.BasePage; + +import io.cdap.plugin.http.common.HttpErrorDetailsProvider; import io.cdap.plugin.http.common.pagination.page.PageEntry; import org.apache.hadoop.io.NullWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; /** @@ -88,10 +93,14 @@ public void prepareRun(BatchSourceContext context) { .setFqn(config.getUrl()).build(); LineageRecorder lineageRecorder = new LineageRecorder(context, asset); lineageRecorder.createExternalDataset(schema); - lineageRecorder.recordRead("Read", String.format("Read from HTTP '%s'", config.getUrl()), - Preconditions.checkNotNull(schema.getFields()).stream() - .map(Schema.Field::getName) - .collect(Collectors.toList())); + List getNameList = Objects.nonNull(schema) ? Preconditions.checkNotNull(schema.getFields()).stream() + .map(Schema.Field::getName) + .collect(Collectors.toList()) : new ArrayList<>(); + lineageRecorder.recordRead("Read", String.format("Read from HTTP '%s'", config.getUrl()), getNameList); + + // set error details provider + context.setErrorDetailsProvider( + new ErrorDetailsProviderSpec(HttpErrorDetailsProvider.class.getName())); context.setInput(Input.of(config.getReferenceNameOrNormalizedFQN(), new HttpInputFormatProvider(config))); } diff --git a/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSourceConfig.java b/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSourceConfig.java index 0f96f996..044f4026 100644 --- a/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSourceConfig.java +++ b/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSourceConfig.java @@ -17,7 +17,9 @@ import com.google.common.base.Strings; import com.google.gson.JsonSyntaxException; -import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.plugin.http.common.http.AuthType; import io.cdap.plugin.http.common.http.HttpClient; @@ -59,19 +61,14 @@ public void validate(FailureCollector failureCollector) { } public void validateCredentials(FailureCollector collector) { - try { - if (getAuthType() == AuthType.OAUTH2) { - validateOAuth2Credentials(collector); - } else if (getAuthType() == AuthType.BASIC_AUTH) { - validateBasicAuthCredentials(collector); - } - } catch (IOException e) { - String errorMessage = "Unable to authenticate the given info : " + e.getMessage(); - collector.addFailure(errorMessage, null); + if (getAuthType() == AuthType.OAUTH2) { + validateOAuth2Credentials(collector); + } else if (getAuthType() == AuthType.BASIC_AUTH) { + validateBasicAuthCredentials(collector); } } - private void validateOAuth2Credentials(FailureCollector collector) throws IOException { + private void validateOAuth2Credentials(FailureCollector collector) { if (!containsMacro(PROPERTY_CLIENT_ID) && !containsMacro(PROPERTY_CLIENT_SECRET) && !containsMacro(PROPERTY_TOKEN_URL) && !containsMacro(PROPERTY_REFRESH_TOKEN) && !containsMacro(PROPERTY_PROXY_PASSWORD) && !containsMacro(PROPERTY_PROXY_USERNAME) && @@ -93,25 +90,24 @@ private void validateOAuth2Credentials(FailureCollector collector) throws IOExce } catch (JsonSyntaxException | HttpHostConnectException e) { String errorMessage = "Error occurred during credential validation : " + e.getMessage(); collector.addFailure(errorMessage, null); + } catch (IOException e) { + String errorMessage = "Unable to validate OAuth and process the request."; + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, e.getMessage(), ErrorType.UNKNOWN, true, new IOException(errorMessage)); } } } - public void validateBasicAuthCredentials(FailureCollector collector) throws IOException { - try { - if (!containsMacro(PROPERTY_URL) && !containsMacro(PROPERTY_USERNAME) && !containsMacro(PROPERTY_PASSWORD) && - !containsMacro(PROPERTY_PROXY_USERNAME) && !containsMacro(PROPERTY_PROXY_PASSWORD) - && !containsMacro(PROPERTY_PROXY_URL)) { - HttpClient httpClient = new HttpClient(this); - validateBasicAuthResponse(collector, httpClient); - } - } catch (HttpHostConnectException e) { - String errorMessage = "Error occurred during credential validation : " + e.getMessage(); - collector.addFailure(errorMessage, "Please ensure that correct credentials are provided."); + public void validateBasicAuthCredentials(FailureCollector collector) { + if (!containsMacro(PROPERTY_URL) && !containsMacro(PROPERTY_USERNAME) && !containsMacro(PROPERTY_PASSWORD) && + !containsMacro(PROPERTY_PROXY_USERNAME) && !containsMacro(PROPERTY_PROXY_PASSWORD) + && !containsMacro(PROPERTY_PROXY_URL)) { + HttpClient httpClient = new HttpClient(this); + validateBasicAuthResponse(collector, httpClient); } } - public void validateBasicAuthResponse(FailureCollector collector, HttpClient httpClient) throws IOException { + public void validateBasicAuthResponse(FailureCollector collector, HttpClient httpClient) { try (CloseableHttpResponse response = httpClient.executeHTTP(getUrl())) { int statusCode = response.getStatusLine().getStatusCode(); if (statusCode != HttpStatus.SC_OK) { @@ -123,6 +119,13 @@ public void validateBasicAuthResponse(FailureCollector collector, HttpClient htt collector.addFailure(errorMessage, "Please ensure that correct credentials are provided."); } } + } catch (HttpHostConnectException e) { + String errorMessage = "Error occurred during credential validation : " + e.getMessage(); + collector.addFailure(errorMessage, "Please ensure that correct credentials are provided."); + } catch (IOException e) { + String errorMessage = "Unable to process the response and validate credentials"; + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, e.getMessage(), ErrorType.UNKNOWN, true, e); } } @@ -195,10 +198,11 @@ public static class HttpBatchSourceConfigBuilder { private String password; - public HttpBatchSourceConfigBuilder setReferenceName (String referenceName) { + public HttpBatchSourceConfigBuilder setReferenceName(String referenceName) { this.referenceName = referenceName; return this; } + public HttpBatchSourceConfigBuilder setAuthUrl(String authUrl) { this.authUrl = authUrl; return this; diff --git a/src/main/java/io/cdap/plugin/http/source/batch/HttpInputFormat.java b/src/main/java/io/cdap/plugin/http/source/batch/HttpInputFormat.java index 69cee410..857fe72b 100644 --- a/src/main/java/io/cdap/plugin/http/source/batch/HttpInputFormat.java +++ b/src/main/java/io/cdap/plugin/http/source/batch/HttpInputFormat.java @@ -15,7 +15,6 @@ */ package io.cdap.plugin.http.source.batch; -import io.cdap.plugin.http.common.pagination.page.BasePage; import io.cdap.plugin.http.common.pagination.page.PageEntry; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputFormat; @@ -29,7 +28,7 @@ /** * InputFormat for mapreduce job, which provides a single split of data. Since in general pagination cannot - * parallelized. + * parallelize. */ public class HttpInputFormat extends InputFormat { @Override diff --git a/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java b/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java index acfa09f7..c17f0536 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java +++ b/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java @@ -45,6 +45,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; import java.util.stream.Stream; @@ -545,7 +546,7 @@ public Map getFullFieldsMapping() { Map result = new HashMap<>(); if (!Strings.isNullOrEmpty(schema)) { - for (Schema.Field field : getSchema().getFields()) { + for (Schema.Field field : Objects.requireNonNull(Objects.requireNonNull(getSchema()).getFields())) { result.put(field.getName(), "/" + field.getName()); } } @@ -563,6 +564,7 @@ public String getReferenceNameOrNormalizedFQN() { return Strings.isNullOrEmpty(referenceName) ? ReferenceNames.normalizeFqn(url) : referenceName; } + public void validate(FailureCollector failureCollector) { super.validate(failureCollector); diff --git a/src/main/java/io/cdap/plugin/http/source/common/DelimitedSchemaDetector.java b/src/main/java/io/cdap/plugin/http/source/common/DelimitedSchemaDetector.java index d7bc3665..9172981c 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/DelimitedSchemaDetector.java +++ b/src/main/java/io/cdap/plugin/http/source/common/DelimitedSchemaDetector.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Objects; /** * Class that detects the schema of the delimited file. @@ -45,23 +46,23 @@ public static Schema detectSchema(HttpBatchSourceConfig config, String delimiter rowValue = getRowValues(line, config.getEnableQuotesValues(), delimiter); if (rowIndex == 0) { columnNames = DataTypeDetectorUtils.setColumnNames(line, config.getCsvSkipFirstRow(), - config.getEnableQuotesValues(), delimiter); - if (config.getCsvSkipFirstRow()) { + config.getEnableQuotesValues(), delimiter); + if (Boolean.TRUE.equals(config.getCsvSkipFirstRow())) { continue; } } DataTypeDetectorUtils.detectDataTypeOfRowValues(new HashMap<>(), dataTypeDetectorStatusKeeper, columnNames, - rowValue); + rowValue); } dataTypeDetectorStatusKeeper.validateDataTypeDetector(); } catch (Exception e) { failureCollector.addFailure(String.format("Error while reading the file to infer the schema. Error: %s", - e.getMessage()), null) - .withStacktrace(e.getStackTrace()); + e.getMessage()), null) + .withStacktrace(e.getStackTrace()); return null; } List fields = DataTypeDetectorUtils.detectDataTypeOfEachDatasetColumn( - new HashMap<>(), columnNames, dataTypeDetectorStatusKeeper); + new HashMap<>(), (Objects.nonNull(columnNames) ? columnNames : new String[0]), dataTypeDetectorStatusKeeper); return Schema.recordOf("text", fields); } diff --git a/src/main/java/io/cdap/plugin/http/source/common/RawStringPerLine.java b/src/main/java/io/cdap/plugin/http/source/common/RawStringPerLine.java index 2943d0d1..9514dba3 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/RawStringPerLine.java +++ b/src/main/java/io/cdap/plugin/http/source/common/RawStringPerLine.java @@ -16,6 +16,9 @@ package io.cdap.plugin.http.source.common; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.plugin.http.common.http.HttpResponse; import java.io.BufferedReader; @@ -61,14 +64,16 @@ public boolean hasNext() { isLineRead = true; return lastLine != null; } catch (IOException e) { // we need to catch this, since hasNext() does not have "throws" in parent - throw new RuntimeException("Failed to read line from http page buffer", e); + String errorMessage = "Unable to read line from http page buffer"; + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, e.getMessage(), ErrorType.UNKNOWN, true, e); } } @Override public String next() { if (!hasNext()) { // calling hasNext will also read the line; - throw new NoSuchElementException(); + throw new NoSuchElementException("Unable to read the next line."); } isLineRead = false; return lastLine; diff --git a/src/test/java/io/cdap/plugin/http/etl/HttpStreamingSourceETLTest.java b/src/test/java/io/cdap/plugin/http/etl/HttpStreamingSourceETLTest.java index 80ffc272..213d9034 100644 --- a/src/test/java/io/cdap/plugin/http/etl/HttpStreamingSourceETLTest.java +++ b/src/test/java/io/cdap/plugin/http/etl/HttpStreamingSourceETLTest.java @@ -52,6 +52,7 @@ public class HttpStreamingSourceETLTest extends HttpSourceETLTest { private static final ArtifactSummary APP_ARTIFACT = new ArtifactSummary("data-streams", "1.0.0"); private static final int WAIT_FOR_RECORDS_TIMEOUT_SECONDS = 60; private static final long WAIT_FOR_RECORDS_POLLING_INTERVAL_MS = 100; + public static final String EXPLORE_ENABLED = "explore.enabled"; @BeforeClass public static void setupTest() throws Exception {