Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support full set of storm commands #60

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/main/java/com/yahoo/storm/yarn/ClassPathCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/

package com.yahoo.storm.yarn;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import com.yahoo.storm.yarn.Client.ClientCommand;

class ClassPathCommand implements ClientCommand {

ClassPathCommand() {
}

@Override
public Options getOpts() {
Options opts = new Options();
return opts;
}

@Override
public String getHeaderDescription() {
return "storm-yarn classpath";
}

@Override
public void process(CommandLine cl) throws Exception {
String classpath = System.getProperty("java.class.path");
System.out.println(classpath);
}
}
14 changes: 12 additions & 2 deletions src/main/java/com/yahoo/storm/yarn/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,12 @@ public void printHelpFor(Collection<String> args) {
public void execute(String[] args) throws Exception {
HashMap<String, ClientCommand> commands = new HashMap<String, ClientCommand>();
HelpCommand help = new HelpCommand(commands);

commands.put("help", help);
commands.put("version", new VersionCommand());
commands.put("classpath", new ClassPathCommand());
commands.put("launch", new LaunchCommand());

commands.put("setStormConfig", new StormMasterCommand(StormMasterCommand.COMMAND.SET_STORM_CONFIG));
commands.put("getStormConfig", new StormMasterCommand(StormMasterCommand.COMMAND.GET_STORM_CONFIG));
commands.put("addSupervisors", new StormMasterCommand(StormMasterCommand.COMMAND.ADD_SUPERVISORS));
Expand All @@ -113,7 +117,13 @@ public void execute(String[] args) throws Exception {
commands.put("startSupervisors", new StormMasterCommand(StormMasterCommand.COMMAND.START_SUPERVISORS));
commands.put("stopSupervisors", new StormMasterCommand(StormMasterCommand.COMMAND.STOP_SUPERVISORS));
commands.put("shutdown", new StormMasterCommand(StormMasterCommand.COMMAND.SHUTDOWN));
commands.put("version", new VersionCommand());

commands.put("jar", new StormTopologySubmitCommand());
commands.put("kill", new StormTopologyKillCommand());
commands.put("list", new StormTopologyListCommand());
commands.put("rebalance", new StormTopologyRebalanceCommand());
commands.put("activate", new StormTopologyActivateCommand());
commands.put("deactivate", new StormTopologyDeactivateCommand());

String commandName = null;
String[] commandArgs = null;
Expand All @@ -134,7 +144,7 @@ public void execute(String[] args) throws Exception {
if(!opts.hasOption("h")) {
opts.addOption("h", "help", false, "print out a help message");
}
CommandLine cl = new GnuParser().parse(command.getOpts(), commandArgs);
CommandLine cl = new GnuParser().parse(command.getOpts(), commandArgs, true);
if(cl.hasOption("help")) {
help.printHelpFor(Arrays.asList(commandName));
} else {
Expand Down
107 changes: 107 additions & 0 deletions src/main/java/com/yahoo/storm/yarn/StormCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/
package com.yahoo.storm.yarn;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Joiner;
import com.google.common.io.Files;
import com.yahoo.storm.yarn.Client.ClientCommand;
import com.yahoo.storm.yarn.generated.StormMaster;

/**
* Works as Storm shell command
*
*/
public abstract class StormCommand implements ClientCommand {

private static final Logger LOG = LoggerFactory.getLogger(StormCommand.class);

protected String appId;

@Override
public Options getOpts() {
Options opts = new Options();
opts.addOption("appId", true, "(Required) The storm clusters app ID");
return opts;
}

protected void process(String command, CommandLine cl) throws Exception {
this.appId = cl.getOptionValue("appId");

if (appId == null) {
throw new IllegalArgumentException("-appId is required");
}

Map stormConf = Config.readStormConfig(null);

StormOnYarn storm = null;

try {

storm = StormOnYarn.attachToApp(appId, stormConf);
StormMaster.Client client = storm.getClient();

File tmpStormConfDir = Files.createTempDir();
File tmpStormConf = new File(tmpStormConfDir, "storm.yaml");

StormMasterCommand.downloadStormYaml(client,
tmpStormConf.getAbsolutePath());

String stormHome = System.getProperty("storm.home");

List<String> commands = new ArrayList<String>();
commands.add("python"); // TODO: Change this to python home
commands.add(stormHome + "/bin/storm");
commands.add(command);

String[] args = cl.getArgs();
if (null != args && args.length > 0) {
for (int i = 0; i < args.length; i++) {
commands.add(args[i]);
}
}
commands.add("--config");
commands.add(tmpStormConf.getAbsolutePath());

LOG.info("Running: " + Joiner.on(" ").join(commands));
ProcessBuilder builder = new ProcessBuilder(commands);

Process process = builder.start();
Util.redirectStreamAsync(process.getInputStream(), System.out);
Util.redirectStreamAsync(process.getErrorStream(), System.err);

process.waitFor();

if (process.exitValue() == 0) {
if (null != tmpStormConfDir) {
tmpStormConf.delete();
}
}
} finally {
if (storm != null) {
storm.stop();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/

package com.yahoo.storm.yarn;

import org.apache.commons.cli.CommandLine;

class StormTopologyActivateCommand extends StormCommand {

@Override
public String getHeaderDescription() {
return "storm-yarn activate -appId=xx topologyId";
}

@Override
public void process(CommandLine cl) throws Exception {
super.process("activate", cl);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/

package com.yahoo.storm.yarn;

import org.apache.commons.cli.CommandLine;

class StormTopologyDeactivateCommand extends StormCommand {

@Override
public String getHeaderDescription() {
return "storm-yarn deactivate -appId=xx topologyId";
}

@Override
public void process(CommandLine cl) throws Exception {
super.process("deactivate", cl);
}
}
30 changes: 30 additions & 0 deletions src/main/java/com/yahoo/storm/yarn/StormTopologyKillCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/

package com.yahoo.storm.yarn;

import org.apache.commons.cli.CommandLine;

class StormTopologyKillCommand extends StormCommand {

@Override
public String getHeaderDescription() {
return "storm-yarn kill -appId=xx -w wait-time-seconds topologyId";
}

@Override
public void process(CommandLine cl) throws Exception {
super.process("kill", cl);
}
}
30 changes: 30 additions & 0 deletions src/main/java/com/yahoo/storm/yarn/StormTopologyListCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/

package com.yahoo.storm.yarn;

import org.apache.commons.cli.CommandLine;

class StormTopologyListCommand extends StormCommand {

@Override
public String getHeaderDescription() {
return "storm-yarn list -appId=xx";
}

@Override
public void process(CommandLine cl) throws Exception {
process("list", cl);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/

package com.yahoo.storm.yarn;

import org.apache.commons.cli.CommandLine;

class StormTopologyRebalanceCommand extends StormCommand {

@Override
public String getHeaderDescription() {
return "storm-yarn rebalance -appId=<app Id> topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]*]";
}

@Override
public void process(CommandLine cl) throws Exception {
process("rebalance", cl);
}
}
30 changes: 30 additions & 0 deletions src/main/java/com/yahoo/storm/yarn/StormTopologySubmitCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/

package com.yahoo.storm.yarn;

import org.apache.commons.cli.CommandLine;

class StormTopologySubmitCommand extends StormCommand {

@Override
public String getHeaderDescription() {
return "storm-yarn jar -appId=xx jarPath MainClass arg0 arg1 arg2";
}

@Override
public void process(CommandLine cl) throws Exception {
process("jar", cl);
}
}
Loading