Skip to content

Commit

Permalink
HPCC-586 WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Rodrigo Pastrana <[email protected]>
  • Loading branch information
rpastrana committed May 29, 2024
1 parent 8dbcfb6 commit f580c4d
Show file tree
Hide file tree
Showing 4 changed files with 324 additions and 96 deletions.
48 changes: 48 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
<groups>org.hpccsystems.commons.annotations.BaseTests</groups>
<codehaus.template.version>1.0.0</codehaus.template.version>
<project.benchmarking>false</project.benchmarking>
<opentelemetry.version>2.4.0-alpha</opentelemetry.version>
</properties>

<scm>
Expand Down Expand Up @@ -99,7 +100,54 @@
</repository>
</repositories>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-bom</artifactId>
<version>1.38.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-logging</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure-spi</artifactId>
</dependency>
<!-- Stable semantic conventions. Note: generated code is still subject to breaking changes while published with "-alpha" suffix. -->
<dependency>
<groupId>io.opentelemetry.semconv</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>1.30.1-alpha</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-otlp</artifactId>
</dependency>
<dependency>
<!-- Not managed by opentelemetry-bom -->
<groupId>io.opentelemetry.semconv</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>1.25.0-alpha</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.text.Format;
import java.util.HashMap;
import java.util.Map;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.ParserConfigurationException;
Expand Down Expand Up @@ -40,13 +43,38 @@
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.context.propagation.TextMapSetter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.semconv.HttpAttributes;
import io.opentelemetry.semconv.ResourceAttributes;
import io.opentelemetry.semconv.ServerAttributes;


/**
* Defines functionality common to all HPCC Systmes web service clients.
*
* Typically implemented by specialized HPCC Web service clients.
*/
public abstract class BaseHPCCWsClient extends DataSingleton
{
private static OpenTelemetry openTelemetry = null; // = AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk();
/** Constant <code>log</code> */
protected static final Logger log = LogManager.getLogger(BaseHPCCWsClient.class);
/** Constant <code>DEAFULTECLWATCHPORT="8010"</code> */
Expand Down Expand Up @@ -164,6 +192,46 @@ private String getTargetHPCCBuildVersionString() throws Exception

}

public SpanBuilder getSpanBuilder(String spanName)
{
Tracer tracer = GlobalOpenTelemetry.getTracer("");
SpanBuilder spanBuilder = tracer.spanBuilder(spanName)
.setAttribute(ServerAttributes.SERVER_ADDRESS, wsconn.getHost())
//.setAttribute(ServerAttributes.SERVER_PORT, wsconn.getPort())
.setAttribute(HttpAttributes.HTTP_REQUEST_METHOD, HttpAttributes.HttpRequestMethodValues.GET)
.setSpanKind(SpanKind.CLIENT);
//.startSpan();

return spanBuilder;
}

static public void injectTraceParentHeader(Options options)
{
if (options != null)
{
Span currentSpan = Span.current();
if (currentSpan != null && currentSpan.getSpanContext().isValid())
W3CTraceContextPropagator.getInstance().inject(Context.current(), options, Options::setProperty);
}
}

static public String getTraceParentHeader()
{
String traceparent = null;
Span currentSpan = Span.current();
if (currentSpan != null && currentSpan.getSpanContext().isValid())
{
Map<String, String> carrier = new HashMap<>();
TextMapSetter<Map<String, String>> setter = Map::put;
W3CTraceContextPropagator.getInstance().inject(Context.current(), carrier, setter);

traceparent = carrier.getOrDefault("traceparent", "00-" + currentSpan.getSpanContext().getTraceId() + "-" + currentSpan.getSpanContext().getSpanId() + "-00");
carrier.clear();
}

return traceparent;
}

/**
* All instances of HPCCWsXYZClient should utilize this init function
* Attempts to establish the target HPCC build version and its container mode
Expand All @@ -175,6 +243,32 @@ private String getTargetHPCCBuildVersionString() throws Exception
*/
protected boolean initBaseWsClient(Connection connection, boolean fetchVersionAndContainerMode)
{
//we should do this higher in the stack if possible and only if enabled
if (openTelemetry == null)
{
//Create an instance of the OpenTelemetry interface using the OpenTelemetrySdk.
try
{
openTelemetry = GlobalOpenTelemetry.get();
if (openTelemetry == null)
{
openTelemetry = AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk();
/*if we want to overwrite auto configuration:
openTelemetry = AutoConfiguredOpenTelemetrySdk.builder()
.addTracerProviderCustomizer(
(sdkTracerProviderBuilder, configProperties) ->
sdkTracerProviderBuilder.addSpanProcessor(
new SpanProcessor() { // implementation omitted for brevity
})).build();
*/
}
}
catch (Exception e) //ConfigurationException
{
System.out.println("OTEL autoconfiguration failure, please revise OTEL dependencies: " + e.getLocalizedMessage());
}
}

boolean success = true;
initErrMessage = "";
setActiveConnectionInfo(connection);
Expand Down Expand Up @@ -582,6 +676,10 @@ static public Stub setStubOptions(Stub thestub, Connection connection) throws Ax

opt.setProperty(HTTPConstants.CHUNKED, Boolean.FALSE);

//only do this if tracing enabled?
injectTraceParentHeader(opt);
//opt.setProperty("traceparent", getTraceParentHeader());

if (connection.getPreemptiveHTTPAuthenticate())
{
//Axis2 now forces connection authenticate, even if target is not secure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@
import org.hpccsystems.ws.client.wrappers.wsworkunits.WorkunitWrapper;
import org.hpccsystems.ws.client.wrappers.wsworkunits.WsWorkunitsClientStubWrapper;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.StatusCode;

/**
* Facilitates ECL WorkUnit related actions.
*
Expand Down Expand Up @@ -295,6 +299,8 @@ public void fastWURefresh(WorkunitWrapper wu) throws Exception, ArrayOfEspExcept
{
verifyStub(); // Throws exception if stub failed

SpanBuilder spanBuilder = getSpanBuilder("FastWURefresh");

WUQuery request = new WUQuery();

WUState previousState = getStateID(wu);
Expand All @@ -304,29 +310,34 @@ public void fastWURefresh(WorkunitWrapper wu) throws Exception, ArrayOfEspExcept

WUQueryResponse response = null;

Span span = spanBuilder.startSpan();
try
{
response = ((WsWorkunits) stub).wUQuery(request);
}
catch (RemoteException e)
{
span.setStatus(StatusCode.ERROR, e.getLocalizedMessage());
throw new Exception("WsWorkunits.fastWURefresh(...) encountered RemoteException.", e);
}
catch (EspSoapFault e)
{
handleEspSoapFaults(new EspSoapFaultWrapper(e), "Could Not perform fastWURefresh");
span.setStatus(StatusCode.ERROR, e.getLocalizedMessage());
handleEspSoapFaults(new EspSoapFaultWrapper(e), "Could Not perform fastWURefresh",);
}

if (response != null)
{
if (response.getExceptions() != null)
handleEspExceptions(new ArrayOfEspExceptionWrapper(response.getExceptions()), "Could Not perform fastWURefresh");

span.setStatus(StatusCode.OK);
if (response.getWorkunits() != null)
{
ECLWorkunit[] eclWorkunit = response.getWorkunits().getECLWorkunit();

if (eclWorkunit != null && eclWorkunit.length == 1) wu.update(eclWorkunit[0]);
if (eclWorkunit != null && eclWorkunit.length == 1)
wu.update(eclWorkunit[0]);
}

if (previousState != getStateID(wu))
Expand Down
Loading

0 comments on commit f580c4d

Please sign in to comment.