Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent execution #18

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ FROM node:18-alpine
ENV AWS_ACCESS_KEY_ID='fake'
ENV AWS_SECRET_ACCESS_KEY='fake'

# Install Docker CLI
RUN apk add --no-cache docker-cli

WORKDIR /app
COPY package.json package.json
RUN npm install --production
Expand Down
34 changes: 33 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,31 @@ services:
# ...
```

## Using `docker exec` to run multiple commands at once

Normally local-api-gateway runs using the official Lambda RIE, which only supports a single execution at a time. To support
concurrent execution you can switch to use Docker Exec.

```yaml
services:
web:
image: bref/local-api-gateway
ports: ['8000:8000']
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- .:/var/task:ro
environment:
TARGET: 'php:8080' # service:port
TARGET_CONTAINER: 'my-php-container' # specify if different to the host within TARGET
TARGET_HANDLER: '/path/to/vendor/bin/bref-local handler.php' # The handler within /var/task; bref-local can be elsewhere
DEV_MAX_REQUESTS_IN_PARALLEL: 10 # number to run simultaneously
DEV_MAX_REQUESTS_IN_QUEUE: 20 # number to queue when capacity is reached
```

## Logging

You can log the processing for visibility during development by setting `LOG_LEVEL` to one of `none`, `info` or `debug`.

## FAQ

### This vs Serverless Offline
Expand All @@ -96,6 +121,13 @@ No, this is a very simple HTTP server. It does not support API Gateway features

### How are parallel requests handled?

The Lambda RIE does not support parallel requests. This project handles them by "queueing" requests. If a request is already being processed, the next request will be queued and processed when the first request is done.
This system offers two execution modes:

- Lambda RIE: one request runs at a time, queuing additional requests
- Docker Exec: handles parallel requests

The Lambda RIE does not support parallel requests. This project handles them by "queueing" requests. If a request is already being processed, the next request will be queued and processed when the first request is done.
This works up to 10 requests in parallel by default. You can change this limit by setting the `DEV_MAX_REQUESTS_IN_PARALLEL` environment variable.

The Docker Exec mode fires `docker exec` on the target container for each request, so can handle parallel requests up to the limits in the target container. Although useful for development, keep in mind that this is not the same as parallel requests in AWS Lambda, as in AWS each Lambda function runs as a single isolated request.
To activate this mode, define `TARGET_CONTAINER` and `TARGET_HANDLER` environment variables.
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
"dist"
],
"scripts": {
"build": "tsc && docker buildx build --load -t bref/local-api-gateway .",
"code-build": "tsc",
"docker-build": "docker buildx build --load -t bref/local-api-gateway .",
"docker-publish": "npm run build && docker buildx build --push --platform linux/amd64,linux/arm64 -t bref/local-api-gateway .",
"build": "code-build && docker-build",
"lint": "eslint .",
"test": "vitest"
},
Expand Down
94 changes: 94 additions & 0 deletions src/docker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import { spawn } from 'child_process';

/**
* Utility function to promisify spawn
*
* @param command The command to run
* @param args The args as an array of strings
*/
const asyncSpawn = (command: string, args: string[]): Promise<{ stdout: string; stderr: string }> => {
return new Promise((resolve, reject) => {
const process = spawn(command, args);
let stdout = '';
let stderr = '';

process.stdout.on('data', (data) => {
stdout += data.toString();
});

process.stderr.on('data', (data) => {
stderr += data.toString();
});

process.on('close', (code) => {
if (code === 0) {
resolve({ stdout, stderr });
} else {
reject(new Error(`Command exited with code ${code}\nSTDOUT: ${stdout}\nSTDERR: ${stderr}`));
}
});

process.on('error', (err) => {
reject(err);
});
});
};

/**
* Handles bref-local output headers
*
* The 'bref-local' handler returns the following header which needs to be stripped:
* v
* START
* END Duration ...
*
* ^
* (real output begins under this line)
*
* @param input
*/
function removeBrefLocalHeaders(input: string): string {
const match = input.match(/END Duration:.*\n([\s\S]*)/);
return match ? match[1].trim() : "";
}

/**
* Runs the Docker Exec command
*
* @param container The docker container name (e.g. 'php')
* @param handler The handler command (e.g. '/path/to/vendor/bref/bref-local handler.php')
* @param payload The JSON-encoded payload
*/
export const runDockerCommand = async (container: string, handler: string, payload: string): Promise<string> => {
// Build the docker command: '/usr/bin/docker exec $CONTAINER $HANDLER $PAYLOAD' for spawn
const [command, ...handlerArgs] = handler.split(' ');
const dockerCommand = [
"exec",
container,
command,
...handlerArgs,
payload,
];

// Run the command and pull the output into a string
let result: string|null = null;
try {
const { stdout, stderr } = await asyncSpawn("/usr/bin/docker", dockerCommand);
if (stderr) {
console.info(`END [DOCKER] COMMAND: `, dockerCommand);
console.error(`END [DOCKER] STDERR: ${stderr}`);
}
result = Buffer.from(stdout).toString();
} catch (error) {
console.info(`END [DOCKER] COMMAND: `, dockerCommand);
console.error(`END [DOCKER] ERROR: ${(error as Error).message}`);
throw error;
}

// Strip header info from bref-local output
if (handler?.includes('bref-local')) {
result = removeBrefLocalHeaders(result);
}

return result;
}
74 changes: 55 additions & 19 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import queue from 'express-queue';
import { APIGatewayProxyStructuredResultV2 } from 'aws-lambda';
import { InvocationType, InvokeCommand, InvokeCommandOutput, LambdaClient } from '@aws-sdk/client-lambda';
import { httpRequestToEvent } from './apiGateway';
import { httpRequestToEvent } from './apiGateway.js';
import { runDockerCommand } from "./docker";
import bodyParser from 'body-parser';

const app = express();
Expand All @@ -15,37 +16,56 @@
app.use(express.static(process.env.DOCUMENT_ROOT));
}

const target = process.env.TARGET;
if (!target || !target.includes(":")) {
throw new Error(
'The TARGET environment variable must be set and contain the domain + port of the target lambda container (for example, "localhost:9000")'
);
}

// Logging levels: 'none', 'info', 'debug'
const doInfoLogging = ['info', 'debug'].includes(process.env.LOG_LEVEL ?? 'none');
const doDebugLogging = process.env.LOG_LEVEL === 'debug';

// Determine whether to use Docker CLI or AWS Lambda RIE for execution
const dockerHost = process.env.TARGET_CONTAINER ?? target.split(":")[0];
const dockerHandler = process.env.TARGET_HANDLER ?? '';
const mode = (dockerHost && dockerHandler) ? "docker": "rie";
if (doInfoLogging) {
if (mode === "docker") {
console.log(`Using docker CLI on '${dockerHost}' via '${dockerHandler}'`);
} else {
console.log("Using AWS Lambda RIE environment - set TARGET_CONTAINER and TARGET_HANDLER environment variables to enable docker CLI mode");
}
}
const maxParallelRequests = process.env.DEV_MAX_REQUESTS_IN_PARALLEL ?? 10;
const maxQueuedRequests = process.env.DEV_MAX_REQUESTS_IN_QUEUE ?? -1;

// Prevent parallel requests as Lambda RIE can only handle one request at a time
// The solution here is to use a request "queue":
// incoming requests are queued until the previous request is finished.
// See https://github.com/brefphp/bref/issues/1471
const requestQueue = queue({
// max requests to process simultaneously
activeLimit: 1,
// max requests to process simultaneously (set to 1 for RIE mode)
activeLimit: (mode === "docker") ? maxParallelRequests : 1,
// max requests in queue until reject (-1 means do not reject)
queuedLimit: process.env.DEV_MAX_REQUESTS_IN_PARALLEL ?? 10,
queuedLimit: maxQueuedRequests,
// handler to call when queuedLimit is reached (see below)
rejectHandler: (req: Request, res: Response) => {
res.status(503);
res.send(
'Too many requests in parallel, set the `DEV_MAX_REQUESTS_IN_PARALLEL` environment variable to increase the limit'
'Too many requests in parallel, set the `DEV_MAX_REQUESTS_IN_PARALLEL` and `DEV_MAX_REQUESTS_IN_QUEUE` environment variables to control the limit'
);
},
});
app.use(requestQueue);

const target = process.env.TARGET;
if (!target) {
throw new Error(
'The TARGET environment variable must be set and contain the domain + port of the target lambda container (for example, "localhost:9000")'
);
}
const client = new LambdaClient({
region: 'us-east-1',
endpoint: `http://${target}`,
});

app.use((err: Error, req: Request, res: Response, next: NextFunction) => {

Check warning on line 68 in src/index.ts

View workflow job for this annotation

GitHub Actions / Lint

'next' is defined but never used
console.error(err.stack);
res.status(500).send('Something broke!');
});
Expand All @@ -62,25 +82,41 @@
app.all('*', async (req: Request, res: Response, next) => {
const event = httpRequestToEvent(req);

let result: InvokeCommandOutput;
let result: string;
const requestContext = event?.requestContext?.http ?? {};
try {
result = await client.send(
new InvokeCommand({
const payload = Buffer.from(JSON.stringify(event)).toString();
if (doInfoLogging) {
console.log(`START [${mode.toUpperCase()}] ${requestContext?.method} ${requestContext?.path}`, doDebugLogging ? payload : '');
}
if (mode === "docker") {
// Run via Docker
result = await runDockerCommand(dockerHost, dockerHandler, payload);
} else {
// Run via Lambda RIE SDK
const invokeCommand = new InvokeCommand({
FunctionName: 'function',
Payload: Buffer.from(JSON.stringify(event)),
Payload: payload,
InvocationType: InvocationType.RequestResponse,
})
);
});
const invokeResponse: InvokeCommandOutput = await client.send(invokeCommand);
result = String(invokeResponse.Payload);
}
if (doInfoLogging) {
console.log(`END [${mode.toUpperCase()}] ${requestContext?.method} ${requestContext?.path}`, doDebugLogging ? result : `${result.length} bytes`);
}

} catch (e) {
console.error(`END [ERROR] ${requestContext?.method} ${requestContext?.path}`, e);
res.send(JSON.stringify(e));

return next(e);
}

if (!result.Payload) {
if (!result) {
return res.status(500).send('No payload in Lambda response');
}
const payload = Buffer.from(result.Payload).toString();
const payload = Buffer.from(result).toString();
let lambdaResponse: APIGatewayProxyStructuredResultV2;
try {
lambdaResponse = JSON.parse(payload) as APIGatewayProxyStructuredResultV2;
Expand Down
Loading