Skip to content

Commit

Permalink
Refactor pulsar function admin api location (#1508)
Browse files Browse the repository at this point in the history
* Refactor pulsar function admin api location
  • Loading branch information
rdhabalia authored Apr 6, 2018
1 parent fb3511d commit acc3151
Show file tree
Hide file tree
Showing 11 changed files with 214 additions and 137 deletions.
12 changes: 12 additions & 0 deletions pulsar-client-admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-functions-proto-shaded</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
import org.apache.pulsar.functions.shaded.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.shaded.proto.InstanceCommunication.FunctionStatusList;

import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.pulsar.client.admin.internal.BrokerStatsImpl;
import org.apache.pulsar.client.admin.internal.BrokersImpl;
import org.apache.pulsar.client.admin.internal.ClustersImpl;
import org.apache.pulsar.client.admin.internal.FunctionsImpl;
import org.apache.pulsar.client.admin.internal.JacksonConfigurator;
import org.apache.pulsar.client.admin.internal.LookupImpl;
import org.apache.pulsar.client.admin.internal.NamespacesImpl;
Expand Down Expand Up @@ -76,6 +77,7 @@ public class PulsarAdmin implements Closeable {
private final Client client;
private final String serviceUrl;
private final Lookup lookups;
private final Functions functions;
protected final WebTarget root;
protected final Authentication auth;

Expand Down Expand Up @@ -170,6 +172,7 @@ public PulsarAdmin(String serviceUrl, ClientConfigurationData clientConfigData)
this.nonPersistentTopics = new NonPersistentTopicsImpl(root, auth);
this.resourceQuotas = new ResourceQuotasImpl(root, auth);
this.lookups = new LookupImpl(root, auth, useTls);
this.functions = new FunctionsImpl(root, auth);
}

/**
Expand Down Expand Up @@ -303,6 +306,14 @@ public Lookup lookups() {
return lookups;
}

/**
*
* @return the functions management object
*/
public Functions functions() {
return functions;
}

/**
* @return the broker statics
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,24 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.policies.data.*;
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.shaded.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.shaded.proto.InstanceCommunication.FunctionStatusList;
import org.glassfish.jersey.media.multipart.FormDataBodyPart;
import org.glassfish.jersey.media.multipart.FormDataMultiPart;
import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;

import org.apache.pulsar.functions.shaded.com.google.protobuf.MessageOrBuilder;
import org.apache.pulsar.functions.shaded.com.google.protobuf.AbstractMessage.Builder;
import org.apache.pulsar.functions.shaded.com.google.protobuf.util.JsonFormat;

import javax.ws.rs.ClientErrorException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.File;
import java.io.IOException;
import java.util.List;

@Slf4j
Expand Down Expand Up @@ -72,7 +76,7 @@ public FunctionConfig getFunction(String tenant, String namespace, String functi
}
String jsonResponse = response.readEntity(String.class);
FunctionConfig.Builder functionConfigBuilder = FunctionConfig.newBuilder();
Utils.mergeJson(jsonResponse, functionConfigBuilder);
mergeJson(jsonResponse, functionConfigBuilder);
return functionConfigBuilder.build();
} catch (Exception e) {
throw getApiException(e);
Expand All @@ -89,7 +93,7 @@ public FunctionStatusList getFunctionStatus(
}
String jsonResponse = response.readEntity(String.class);
FunctionStatusList.Builder functionStatusBuilder = FunctionStatusList.newBuilder();
Utils.mergeJson(jsonResponse, functionStatusBuilder);
mergeJson(jsonResponse, functionStatusBuilder);
return functionStatusBuilder.build();
} catch (Exception e) {
throw getApiException(e);
Expand All @@ -104,7 +108,7 @@ public void createFunction(FunctionConfig functionConfig, String fileName) throw
mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));

mp.bodyPart(new FormDataBodyPart("functionConfig",
Utils.printJson(functionConfig),
printJson(functionConfig),
MediaType.APPLICATION_JSON_TYPE));
request(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()))
.post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
Expand All @@ -131,7 +135,7 @@ public void updateFunction(FunctionConfig functionConfig, String fileName) throw
mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
}
mp.bodyPart(new FormDataBodyPart("functionConfig",
Utils.printJson(functionConfig),
printJson(functionConfig),
MediaType.APPLICATION_JSON_TYPE));
request(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()))
.put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
Expand Down Expand Up @@ -159,4 +163,12 @@ public String triggerFunction(String tenant, String namespace, String functionNa
throw getApiException(e);
}
}

public static void mergeJson(String json, Builder builder) throws IOException {
JsonFormat.parser().merge(json, builder);
}

public static String printJson(MessageOrBuilder msg) throws IOException {
return JsonFormat.printer().print(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,33 +31,31 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;

import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBuf;
import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBufUtil;
import java.io.File;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.bookkeeper.api.StorageClient;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import java.util.HashMap;
import java.util.Map;

import com.google.gson.Gson;
import org.apache.pulsar.admin.cli.CmdFunctions.CreateFunction;
import org.apache.pulsar.admin.cli.CmdFunctions.DeleteFunction;
import org.apache.pulsar.admin.cli.CmdFunctions.GetFunction;
import org.apache.pulsar.admin.cli.CmdFunctions.ListFunctions;
import org.apache.pulsar.admin.cli.CmdFunctions.LocalRunner;
import org.apache.pulsar.admin.cli.CmdFunctions.UpdateFunction;
import org.apache.pulsar.client.admin.Functions;
import org.apache.pulsar.client.admin.PulsarAdminWithFunctions;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.shaded.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBuf;
import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBufUtil;
import org.apache.pulsar.functions.utils.Reflections;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
Expand All @@ -67,6 +65,8 @@
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;

import com.google.gson.Gson;

/**
* Unit test of {@link CmdFunctions}.
*/
Expand All @@ -81,7 +81,7 @@ public IObjectFactory getObjectFactory() {

private static final String TEST_NAME = "test_name";

private PulsarAdminWithFunctions admin;
private PulsarAdmin admin;
private Functions functions;
private CmdFunctions cmd;

Expand All @@ -100,7 +100,7 @@ private String generateCustomSerdeInputs(String topic, String serde) {

@BeforeMethod
public void setup() throws Exception {
this.admin = mock(PulsarAdminWithFunctions.class);
this.admin = mock(PulsarAdmin.class);
this.functions = mock(Functions.class);
when(admin.functions()).thenReturn(functions);
when(admin.getServiceUrl()).thenReturn("http://localhost:1234");
Expand Down
Loading

0 comments on commit acc3151

Please sign in to comment.