Skip to content

Commit

Permalink
Merge pull request #7 from bnusunny/dev
Browse files Browse the repository at this point in the history
split video in parallel
  • Loading branch information
bnusunny authored Jan 21, 2021
2 parents 319278f + 4b10d84 commit 79fdcbb
Show file tree
Hide file tree
Showing 23 changed files with 2,274 additions and 260 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -248,4 +248,6 @@ $RECYCLE.BIN/

samconfig.toml
.aws-sam
packaged.yaml
packaged.yaml

efs
18 changes: 6 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,16 @@ Serverless视频转码:通过Step Functions, Lambda和EFS实现分布式视频


## 部署方式
目前有两种部署方式:快速部署和手工部署。

### 快速部署

使用Quickstart/templates目录下的CloudFormation模版,可以快速完成部署。这个模版会新建带有两个公有子网的VPC, S3和DynamoDB Endpoints, S3存储桶,DyanomDB表,EFS文件系统, Lambda函数和Step Functions状态机。

| Region | Launch Stack in VPC |
| :-------------------------: | :----------------------------------------------------------: |
| **北京** (cn-north-1) | [![cloudformation-launch-stack](https://s3.amazonaws.com/cloudformation-examples/cloudformation-launch-stack.png)](https://console.amazonaws.cn/cloudformation/home?region=cn-north-1#/stacks/new?stackName=serverless-video-transcoder&templateURL=https://aws-quickstart-cn.s3.cn-northwest-1.amazonaws.com.cn/serverless-video-transcoder/main.template.yaml) |
| **宁夏** (cn-northwest-1) | [![cloudformation-launch-stack](https://s3.amazonaws.com/cloudformation-examples/cloudformation-launch-stack.png)](https://console.amazonaws.cn/cloudformation/home?region=cn-northwest-1#/stacks/new?stackName=serverless-video-transcoder&templateURL=https://aws-quickstart-cn.s3.cn-northwest-1.amazonaws.com.cn/serverless-video-transcoder/main.template.yaml) |
| Region | Launch Stack in VPC |
| :-------------------------: | :----------------------------------------------------------: |
| **北京** (cn-north-1) | [![cloudformation-launch-stack](https://s3.amazonaws.com/cloudformation-examples/cloudformation-launch-stack.png)](https://console.amazonaws.cn/cloudformation/home?region=cn-north-1#/stacks/new?stackName=serverless-video-transcoder&templateURL=https://serverless-video-transcoder-cn-north-1.s3.cn-north-1.amazonaws.com.cn/templates/template.yaml) |
| **宁夏** (cn-northwest-1) | [![cloudformation-launch-stack](https://s3.amazonaws.com/cloudformation-examples/cloudformation-launch-stack.png)](https://console.amazonaws.cn/cloudformation/home?region=cn-northwest-1#/stacks/new?stackName=serverless-video-transcoder&templateURL=https://aws-quickstart-cn.s3.cn-northwest-1.amazonaws.com.cn/serverless-video-transcoder/template.yaml) |
| **N. Virginia** (us-east-1) | [![cloudformation-launch-stack](https://s3.amazonaws.com/cloudformation-examples/cloudformation-launch-stack.png)](https://console.aws.amazon.com/cloudformation/home?region=us-east-1#/stacks/new?stackName=serverless-video-transcoder&templateURL=https://serverless-video-transcoder.s3.amazonaws.com/templates/template.yaml) |
| **Tokyo** (ap-northeast-1) | [![cloudformation-launch-stack](https://s3.amazonaws.com/cloudformation-examples/cloudformation-launch-stack.png)](https://console.aws.amazon.com/cloudformation/home?region=ap-northeast-1#/stacks/new?stackName=serverless-video-transcoder&templateURL=https://serverless-video-transcoder-ap-northeast-1.s3-ap-northeast-1.amazonaws.com/templates/template.yaml) |

### 手工部署
1. 使用现有vpc,选择两个子网,记录子网ID,也可以新建VPC。子网需要能够访问S3和DynamoDB。建议通过S3,DynamoDB endpoint。
2. 在VPC中新建EFS file system和accesspoint,设置POSIX user和Root directory creation permissions。记录file system ID和access point ID,在第4步中使用。
3. sam build
4. sam deploy --guided 输入对应的参数


## 使用方法
Expand Down
38 changes: 38 additions & 0 deletions events/s3.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"Records": [
{
"eventVersion": "2.0",
"eventSource": "aws:s3",
"awsRegion": "us-east-1",
"eventTime": "1970-01-01T00:00:00.000Z",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "EXAMPLE"
},
"requestParameters": {
"sourceIPAddress": "127.0.0.1"
},
"responseElements": {
"x-amz-request-id": "EXAMPLE123456789",
"x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH"
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "testConfigRule",
"bucket": {
"name": "serverless-video-transcoding",
"ownerIdentity": {
"principalId": "EXAMPLE"
},
"arn": "arn:aws:s3:::serverless-video-transcoding"
},
"object": {
"key": "videos/wildlife_1h_4k.mp4",
"size": 7210676572,
"eTag": "617586411c792e23833f01ea820822d3-860",
"sequencer": "0A1B2C3D4E5F678901"
}
}
}
]
}
File renamed without changes.
91 changes: 91 additions & 0 deletions functions/controller_function/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import boto3
import os
import json
import math
import subprocess
from urllib.parse import unquote_plus
from botocore.config import Config

s3_client = boto3.client('s3', os.environ['AWS_REGION'], config=Config(
s3={'addressing_style': 'path'}))
efs_path = os.environ['EFS_PATH']
PARALLEL_GROUPS = int(os.environ['PARALLEL_GROUPS'])
MAX_CONCURRENCY_MAP = 40

def analyze_video(bucket, key, video_file):
video_file_presigned_url = s3_client.generate_presigned_url(
ClientMethod='get_object',
Params={'Bucket': bucket, 'Key': key},
ExpiresIn=600
)

# get media information.
cmd = ['ffprobe', '-loglevel', 'error', '-show_format',
'-show_streams', '-of', 'json', video_file_presigned_url]

print("get media information")
return json.loads(subprocess.check_output(cmd))


def generate_control_data(video_details, segment_time, download_dir, object_name):
control_data = {
"video_details": video_details,
"download_dir": download_dir,
"object_name": object_name,
"video_groups": []
}

video_stream = None
for stream in video_details["streams"]:
if stream["codec_type"] == "video":
video_stream = stream
break

if video_stream != None:
video_duration = float(video_stream["duration"])
segment_count = int(math.ceil(video_duration / segment_time))
print("video duration: {}, segment_time: {}, segment_count: {}".format(
video_duration, segment_time, segment_count))

video_groups = []
group_count = PARALLEL_GROUPS
group_segment_count = math.ceil(1.0*segment_count/group_count)

for group_index in range(0, group_count):
video_segments = []
for segment_index in range(0, group_segment_count):
if segment_count <= 0:
break
video_segments.append({
"start_ts": segment_time * (group_index * group_segment_count + segment_index),
"duration": segment_time,
"segment_order": group_index * group_segment_count + segment_index
})
segment_count -= 1
video_groups.append(video_segments)

control_data["video_groups"] = video_groups

return control_data


def lambda_handler(event, context):
bucket = event['bucket']
key = event['key']
object_prefix = event['object_prefix']
object_name = event['object_name']
download_dir = os.path.join(efs_path, event['job_id'])
segment_time = int(event.get('segment_time', os.environ['DEFAULT_SEGMENT_TIME']))

try:
os.mkdir(download_dir)
except FileExistsError as error:
print('directory exist')

os.chdir(download_dir)

video_details = analyze_video(bucket, key, object_name)

control_data = generate_control_data(video_details, segment_time, download_dir, object_name)

return control_data
2 changes: 0 additions & 2 deletions functions/generate_hls_function/app.py

This file was deleted.

8 changes: 5 additions & 3 deletions functions/merge_video_function/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,19 @@ def lambda_handler(event, context):

# upload merged media to S3
job_id = download_dir.split("/")[-1]
object_name = re.sub('_seg_.*', '.mp4', event[0][0]['transcoded_segment'])
object_name = event[0][0]['object_name']

bucket = os.environ['MEDIA_BUCKET']
key = 'output/{}/{}'.format(job_id, object_name)
s3_client.upload_file(merged_file, bucket, key)
s3_client.upload_file(merged_file, bucket, key, ExtraArgs={'ContentType': 'video/mp4'})
# delete the temp download directory
shutil.rmtree(download_dir)

return {
'download_dir': download_dir,
'input_segments': len(segment_list),
'merged_video': merged_file,
'create_hls': 0
'create_hls': 0,
'output_bucket': bucket,
'output_key': key
}
Empty file.
77 changes: 0 additions & 77 deletions functions/split_video_function/app.py

This file was deleted.

Empty file.
25 changes: 14 additions & 11 deletions functions/transcode_video_function/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,33 @@
from urllib.parse import unquote_plus


def transcode_segment(video_file):
img_prefix = video_file.split('.')[0]
img_filenames = img_prefix + '_720p.mp4'
def transcode_segment(presigned_url, start_ts, duration, segment_order):
output_filename = 'tmp_' + str(segment_order) + '.mp4'

# extract all i-frames as thumbnails
cmd = ['ffmpeg', '-loglevel', 'error', '-i', video_file, '-vf', "scale=-1:720", '-c:a', 'copy', img_filenames]
cmd = ['ffmpeg', '-v', 'error', '-ss', str(start_ts - 1), '-i', presigned_url, '-ss', '1', '-t', str(duration), '-vf', "scale=-1:720", '-x264opts', 'stitchable', '-c:a', 'copy', '-y', output_filename]

# create thumbnails
print("trancoding the segment: " + video_file)
subprocess.call(cmd)
print("trancoding the segment")
subprocess.check_output(cmd)

return img_filenames
return output_filename


def lambda_handler(event, context):
download_dir = event['download_dir']
os.chdir(download_dir)
video_segment = event['video_segments']
segment_order = video_segment['segment_order']
presigned_url = event['presigned_url']
object_name = event['object_name']
start_ts = event['video_segment']['start_ts']
duration = event['video_segment']['duration']
segment_order = event['video_segment']['segment_order']

result = transcode_segment(video_segment['segment_file'])
result = transcode_segment(presigned_url, start_ts, duration, segment_order)

return {
'download_dir': download_dir,
'transcoded_segment': result,
'segment_order': segment_order
'segment_order': segment_order,
'object_name': object_name
}
6 changes: 6 additions & 0 deletions functions/trigger_statemachine_function/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@
import uuid
import datetime
from urllib.parse import unquote_plus
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

dynamodb = boto3.resource('dynamodb')
job_table = dynamodb.Table(os.environ['JOB_TABLE'])
create_hls = os.environ['ENABLE_HLS']
segment_time = os.environ['DEFAULT_SEGMENT_TIME']
sfn_client = boto3.client('stepfunctions')


Expand Down Expand Up @@ -40,6 +45,7 @@ def lambda_handler(event, context):
'key': key,
'object_prefix': object_prefix,
'object_name': object_name,
"segment_time": segment_time,
'create_hls': create_hls
})
)
1 change: 1 addition & 0 deletions functions/trigger_statemachine_function/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
aws-xray-sdk==2.4.2
Loading

0 comments on commit 79fdcbb

Please sign in to comment.