Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supervisor Resource capability configuration #64

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
</repositories>
<properties>
<storm.version>0.9.0-wip21</storm.version>
<hadoop.version>2.1.0-beta</hadoop.version>
<hadoop.version>2.2.0</hadoop.version>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this requires Hadoop 2.2 then we probably want to find the correct version of CDH and HDP that also work with it and update them accordingly. (Just because people are going to ask for it)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. HDP seems to be at 2.2.0.2.1.0.0-92.
CDH on the other hand seems to be on a SNAPSHOT? Wasn't sure: (https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/hadoop/hadoop-common/)

<!--hadoop.version>2.1.0.2.0.5.0-67</hadoop.version-->
</properties>
<dependencies>
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/com/yahoo/storm/yarn/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ public class Config {
//# of milliseconds to wait for YARN report on Storm Master host/port
final public static String YARN_REPORT_WAIT_MILLIS = "yarn.report.wait.millis";
final public static String MASTER_HEARTBEAT_INTERVAL_MILLIS = "master.heartbeat.interval.millis";


//size of the supervisor to request in yarn. This includes the supervisor
// and workers
final public static String SUPERVISOR_SIZE_MB = "supervisor.container.size-mb";
final public static int DEFAULT_SUPERVISOR_SIZE = 1024;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value seems really low as a default. The default storm configs have the supervisor using a 256 MB heap and each working using a 768 MB heap. This only gives us space for one worker, assuming that there is no other overhead, and by default there are 4 worker slots configured. I would prefer to see the default be 256 + 4 * 768 + 256 (slush space) = 3584MB. Alternatively you could do something like with the VCores and dynamically compute something based off of the number of workers, but have this config be an override for that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will configure it correctly with an override in case anyone wants to.


@SuppressWarnings("rawtypes")
static public Map readStormConfig() {
return readStormConfig(null);
Expand Down
15 changes: 11 additions & 4 deletions src/main/java/com/yahoo/storm/yarn/MasterServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,16 @@ public void run() {
if (allocatedContainers.size() > 0) {
// Add newly allocated containers to the client.
LOG.info("HB: Received allocated containers (" + allocatedContainers.size() + ")");
client.addAllocatedContainers(allocatedContainers);
if (client.supervisorsAreToRun()) {
LOG.info("HB: Supervisors are to run, so queueing (" + allocatedContainers.size() + ") containers...");
launcherQueue.addAll(allocatedContainers);
for(Container allocatedContainer : allocatedContainers) {
if(client.addAllocatedContainer(allocatedContainer)){
if(LOG.isDebugEnabled()) {
LOG.debug("HB: Queuing supervisor container["+allocatedContainer+"]");
}
launcherQueue.addAll(allocatedContainers);
}
}
} else {
LOG.info("HB: Supervisors are to stop, so releasing all containers...");
client.stopAllSupervisors();
Expand All @@ -95,6 +101,9 @@ public void run() {

if (completedContainers.size() > 0 && client.supervisorsAreToRun()) {
LOG.debug("HB: Containers completed (" + completedContainers.size() + "), so releasing them.");
for(ContainerStatus containerStatus : completedContainers) {
client.stopSupervisors(containerStatus.getContainerId());
}
client.startAllSupervisors();
}

Expand Down Expand Up @@ -162,8 +171,6 @@ public static void main(String[] args) throws Exception {
RegisterApplicationMasterResponse resp =
rmClient.registerApplicationMaster(addr.getHostName(), port, null);
LOG.info("Got a registration response "+resp);
LOG.info("Max Capability "+resp.getMaximumResourceCapability());
rmClient.setMaxResource(resp.getMaximumResourceCapability());
LOG.info("Starting HB thread");
server.initAndStartHeartbeat(rmClient, launcherQueue,
(Integer) storm_conf
Expand Down
192 changes: 140 additions & 52 deletions src/main/java/com/yahoo/storm/yarn/StormAMRMClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,31 @@

package com.yahoo.storm.yarn;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.security.UserGroupInformation;
import backtype.storm.utils.Utils;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure rearranging the ordering of the imports is that necessary here. It makes the diff a bit difficult to read.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh man, sorry about that. I must have "organized my imports" in the IDE.

import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Records;

import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;

import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.utils.Utils;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

class StormAMRMClient extends AMRMClientImpl<ContainerRequest> {
private static final Logger LOG = LoggerFactory.getLogger(StormAMRMClient.class);
Expand All @@ -61,10 +49,10 @@ class StormAMRMClient extends AMRMClientImpl<ContainerRequest> {
private final Map storm_conf;
private final YarnConfiguration hadoopConf;
private final Priority DEFAULT_PRIORITY = Records.newRecord(Priority.class);
private final Set<Container> containers;
private final BiMap<NodeId, ContainerId> runningSupervisors;
private final Resource supervisorResource;
private volatile boolean supervisorsAreToRun = false;
private AtomicInteger numSupervisors;
private Resource maxResourceCapability;
private ApplicationAttemptId appAttemptId;
private NMClientImpl nmClient;

Expand All @@ -76,21 +64,56 @@ public StormAMRMClient(ApplicationAttemptId appAttemptId,
this.hadoopConf = hadoopConf;
Integer pri = Utils.getInt(storm_conf.get(Config.MASTER_CONTAINER_PRIORITY));
this.DEFAULT_PRIORITY.setPriority(pri);
this.containers = new TreeSet<Container>();
numSupervisors = new AtomicInteger(0);
runningSupervisors = Maps.synchronizedBiMap(HashBiMap.<NodeId,
ContainerId>create());

// start am nm client
nmClient = (NMClientImpl) NMClient.createNMClient();
nmClient.init(hadoopConf);
nmClient.start();

//get number of slots for supervisor
int numWorkersPerSupervisor = Util.getNumWorkers(storm_conf);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure we want to request CPU this way. I can see the supervisor only using 1 VCore, but by default the workers are multi-threaded. they may only add up to one vcore, but if it is really loaded it may be many times that. I am fine with this as a default, but I would like to see a config that can override this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that makes perfect sense. Will do.

int supervisorSizeMB = Util.getSupervisorSizeMB(storm_conf);
//add 1 for the supervisor itself
supervisorResource =
Resource.newInstance(supervisorSizeMB, numWorkersPerSupervisor + 1);
LOG.info("Supervisors will allocate Yarn Resource["+supervisorResource+"]");
}

public synchronized void startAllSupervisors() {
LOG.debug("Starting all supervisors, requesting containers...");
this.supervisorsAreToRun = true;
this.addSupervisorsRequest();
}


/**
* Stopping a supervisor by {@link NodeId}
* @param nodeIds
*/
public synchronized void stopSupervisors(NodeId... nodeIds) {
if(LOG.isDebugEnabled()){
LOG.debug(
"Stopping supervisors at nodes[" + Arrays.toString(nodeIds) + "], " +
"releasing all containers.");
}
releaseSupervisors(nodeIds);
}

/**
* Need to be able to stop a supervisor by {@link ContainerId}
* @param containerIds supervisor containers to stop
*/
public synchronized void stopSupervisors(ContainerId... containerIds) {
if(LOG.isDebugEnabled()){
LOG.debug("Stopping supervisors in containers[" +
Arrays.toString(containerIds) + "], " +
"releasing all containers.");
}
releaseSupervisors(containerIds);
}

public synchronized void stopAllSupervisors() {
LOG.debug("Stopping all supervisors, releasing all containers...");
this.supervisorsAreToRun = false;
Expand All @@ -100,36 +123,106 @@ public synchronized void stopAllSupervisors() {
private void addSupervisorsRequest() {
int num = numSupervisors.getAndSet(0);
for (int i=0; i<num; i++) {
ContainerRequest req = new ContainerRequest(this.maxResourceCapability,
ContainerRequest req = new ContainerRequest(this.supervisorResource,
null, // String[] nodes,
null, // String[] racks,
DEFAULT_PRIORITY);
super.addContainerRequest(req);
}
}

public synchronized boolean addAllocatedContainers(List<Container> containers) {
for (int i=0; i<containers.size(); i++) {
ContainerRequest req = new ContainerRequest(this.maxResourceCapability,
null, // String[] nodes,
null, // String[] racks,
DEFAULT_PRIORITY);
super.removeContainerRequest(req);

/**
* Add supervisor from allocation. If supervisor is already running there,
* release the assigned container.
*
* @param container Container to make supervisor
* @return true if supervisor assigned, false if supervisor is not assigned
*/
public synchronized boolean addAllocatedContainer(Container container) {
ContainerId containerId = container.getId();
NodeId nodeId = container.getNodeId();

//check if supervisor is already running at this host, if so,do not request
if (!runningSupervisors.containsKey(nodeId)) {
//add a running supervisor
addRunningSupervisor(nodeId, containerId);

ContainerRequest req = new ContainerRequest(this.supervisorResource,
null, // String[] nodes,
null, // String[] racks,
DEFAULT_PRIORITY);
super.removeContainerRequest(req);
return true;
} else {
//deallocate this request
LOG.info("Supervisor already running on node["+nodeId+"]");
super.releaseAssignedContainer(containerId);
//since no allocation, increase the number of supervisors to allocate
//in hopes that another node may be added in the future
numSupervisors.incrementAndGet();
return false;
}
}

/**
* Will add this node and container to the running supervisors. And also
* add the node to the blacklist.
* @param nodeId
* @param containerId
*/
private void addRunningSupervisor(NodeId nodeId, ContainerId containerId) {
runningSupervisors.put(nodeId, containerId);
//add to blacklist, so no more resources are allocated here
super.updateBlacklist(Lists.newArrayList(nodeId.getHost()), null);
}

/**
* Will remove this node from the running supervisors. And also remove from
* the blacklist
* @param nodeId Node to remove
* @return ContainerId if in the list of running supervisors, null if not
*/
private ContainerId removeRunningSupervisor(NodeId nodeId) {
ContainerId containerId = runningSupervisors.remove(nodeId);
if(containerId != null) {
super.updateBlacklist(null, Lists.newArrayList(nodeId.getHost()));
}
return this.containers.addAll(containers);
return containerId;
}

private synchronized void releaseAllSupervisorsRequest() {
Iterator<Container> it = this.containers.iterator();
ContainerId id;
while (it.hasNext()) {
id = it.next().getId();
LOG.debug("Releasing container (id:"+id+")");
releaseAssignedContainer(id);
it.remove();
Set<NodeId> nodeIds = runningSupervisors.keySet();
this.releaseSupervisors(nodeIds.toArray(new NodeId[nodeIds.size()]));
}

/**
* This is the main entry point to release a supervisor.
* @param nodeIds
*/
private synchronized void releaseSupervisors(NodeId... nodeIds) {
for(NodeId nodeId : nodeIds) {
//remove from running supervisors list
ContainerId containerId = removeRunningSupervisor(nodeId);
if(containerId != null) {
LOG.debug("Releasing container (id:"+containerId+")");
//release the containers on the specified nodes
super.releaseAssignedContainer(containerId);
//increase the number of supervisors to request on the next heartbeat
numSupervisors.incrementAndGet();
}
}
}

private synchronized void releaseSupervisors(ContainerId... containerIds) {
BiMap<ContainerId, NodeId> inverse = runningSupervisors.inverse();
for(ContainerId containerId : containerIds) {
NodeId nodeId = inverse.get(containerId);
if(nodeId != null) {
this.releaseSupervisors(nodeId);
}
}
}

public synchronized boolean supervisorsAreToRun() {
return this.supervisorsAreToRun;
}
Expand Down Expand Up @@ -205,9 +298,4 @@ else if (vis.equals("APPLICATION"))
System.exit(-1);
}
}

public void setMaxResource(Resource maximumResourceCapability) {
this.maxResourceCapability = maximumResourceCapability;
LOG.info("Max Capability is now "+this.maxResourceCapability);
}
}
25 changes: 25 additions & 0 deletions src/main/java/com/yahoo/storm/yarn/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;

import backtype.storm.utils.Utils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -61,6 +62,8 @@

class Util {
private static final String STORM_CONF_PATH_STRING = "conf" + Path.SEPARATOR + "storm.yaml";
private static final String STORM_CONF_SUPERVISOR_SLOTS_PORTS =
"supervisor.slots.ports";

static String getStormHome() {
String ret = System.getProperty("storm.home");
Expand Down Expand Up @@ -139,6 +142,28 @@ static void rmNulls(Map map) {
}
}

/**
* Get the number of workers for a supervisor. Will count the number of
* workers from the number of slots provided in the supervisor.slots.ports
* storm configuration
*
* @param stormConf
*/
@SuppressWarnings("rawtypes")
static int getNumWorkers(Map stormConf) {
List slots = (List) stormConf.get(STORM_CONF_SUPERVISOR_SLOTS_PORTS);
if(slots == null || slots.size() == 0) {
return 1; //default to one worker
}
return slots.size();
}

static int getSupervisorSizeMB(Map stormConf) {
Object sizeObj = stormConf.get(Config.SUPERVISOR_SIZE_MB);
return sizeObj != null ? Utils.getInt(sizeObj) :
Config.DEFAULT_SUPERVISOR_SIZE;
}

@SuppressWarnings("rawtypes")
static Path createConfigurationFileInFs(FileSystem fs,
String appHome, Map stormConf, YarnConfiguration yarnConf)
Expand Down