Authors
Articles
Tags
computing

HPC with Step Functions

Published on
Last edited on

Written by Robert Koch

9 min read
High-Performance Computing (HPC) has become a mainstream method for the processing of big data in the fields of machine learning and data science. But the systems that are used to create such high-performance clusters are not always accessible to everyday users. I found this out the hard way when I was working on a project recently to calculate the maximum number of objects that I could store inside of S3. I found a better way of solving the problem by using AWS Step Functions and distributed maps to create a serverless HPC cluster capable of running lots of small tasks at an incredible speed for a fraction of the price, and today I'm going to show you how I did it.
The project this article references can be found on GitHub here. Based on #aws-cdk this project can be deployed inside of an AWS environment in just a few minutes.

Let's have a look at what we're deploying with this project. For this project I needed to evaluate a number of large equations, I then needed to sum all of the results together. You can have a look at the previous article to get the details about what this is calculating but to quickly get you up to speed we're calculating the total number of valid S3 key values that are valid unicode character combinations. The architecture diagram below shows how this project will compute the result.

Architecture Diagram for the HPC Step Function.
Architecture Diagram for the HPC Step Function.

As you can see the design of what we're going to build is very simple. There are three lambda functions which make up the compute component of our cluster.

  1. Start function which creates the inputs for the number of jobs we want to run.
  2. Compute function that will run the calculation that needs to be computed for the workload. This runs in parallel across a distributed map to compute the answer.
  3. Sum function will take the results from the compute functions and sum the values to calculate the final answer.
Using CDK we can create a stack to deploy these functions into our AWS environment. To create a CDK project install AWS CDK and run the init command.
cdk init app --language typescript
Inside the lib directory will be a *-stack.ts file. Here is where we will define new constructs for our stack. To start with lets create constructs for our three functions.
import * as cdk from 'aws-cdk-lib'
import { Construct } from 'constructs'
import { Runtime } from 'aws-cdk-lib/aws-lambda'
import { Aws, Duration } from 'aws-cdk-lib'
import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha'
export class HpcBatchStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props)
const startFunction = new PythonFunction(this, 'LambdaStart', {
entry: 'lib/lambda/01-start-function.py',
runtime: Runtime.PYTHON_3_9,
})
const computeFunction = new PythonFunction(this, 'LambdaMap-Compute', {
entry: 'lib/lambda/02-compute-map.py',
runtime: Runtime.PYTHON_3_9,
timeout: Duration.minutes(15),
memorySize: 4092,
})
const sumFunction = new PythonFunction(this, 'LambdaSum', {
entry: 'lib/lambda/03-sum.py',
runtime: Runtime.PYTHON_3_9,
timeout: Duration.minutes(15),
memorySize: 4092,
})
}
}
This is now a simple CDK stack to create and deploy lambda functions. The PythonFunction is a special construct that will bundle python lambda functions as part of the CDK deployment. The code for each function is located in the following files. You can see what they're doing below.
The start function is the simplest. All it does is take an input size and returns an array of length size. This will be used in the compute function to specify how large the map will be.
01-start-function.py
1def handler(event, context):
2 event.update({"items": [v for v in range(0, event["size"])]})
3 return event

The compute function is the main part of this pattern. This is where all the calculations for the process are done in parallel. You can modify what this lambda does to change the type of calculation done.

02-compute-map.py
1import math
2
3def multinomial(*vals):
4 result = 1
5 for i in vals:
6 result *= math.factorial(i)
7 return math.factorial(sum(vals)) // result
8
9
10def handler(event, context):
11 result = 0
12
13 i = int(event["index"])
14 for j in range(0, event["size"] // 2+1):
15 if 2*j > i:
16 break
17
18 # @show i, j
19 for k in range(0, event["size"] // 3+1):
20 if 2*j + 3*k > i:
21 break
22
23 for l in range(0, event["size"] // 4+1):
24 if 2*j + 3*k + 4*l > i:
25 break
26
27 result += multinomial(i - 2*j - 3*k - 4*l, j, k, l) * 128 ** (
28 i - 2*j - 3*k - 4*l) * 1863 ** (j) * 42451 ** (k) * 78341 ** (l)
29
30 event.update({"result": result})
31 return event

The last part of the step function is to sum all the results from the compute step. Since this step function is based on a distributed map the results are stored in an S3 bucket (more on this later). To read the results we just download the file and then return the sum of the results.

03-sum.py
1import boto3
2import json
3
4s3 = boto3.client("s3")
5
6def handler(event, context):
7 s = 0
8 results = event["results"]
9 bucket = results["ResultWriterDetails"]["Bucket"]
10 key = results["ResultWriterDetails"]["Key"]
11
12 data = s3.get_object(Bucket=bucket, Key=key)
13 manifest = json.loads(data["Body"].read().decode("utf-8"))
14
15 succeeded = manifest["ResultFiles"]["SUCCEEDED"]
16 for obj in succeeded:
17 data = s3.get_object(Bucket=bucket, Key=obj["Key"])
18 successful_results = json.loads(data["Body"].read().decode("utf-8"))
19 # print(result)
20 for result in successful_results:
21 payload = json.loads(result["Output"])
22 s += int(payload["Payload"]["result"])
23
24 event.update({"total_sum": s})
25 return event
The compute function is invoked inside of a distributed map which is a high-concurrency map mode designed to process a larger number of inputs. Distributed maps only support writing results to an S3 bucket as there can be upwards of tens of thousands of results. To use this in the stack we need to create an S3 bucket to store the results of the map.
At the time of writing CDK does not have a construct for distributed maps at the moment, so as a workaround we need to create a dummy map using a custom state to encapsulate the mapping functionality. It sounds complicated but the main difference is that the DistributedMap is defined in a stateJson object instead of a strictly typed object.
1import * as cdk from 'aws-cdk-lib'
2import { Construct } from 'constructs'
3import { Runtime } from 'aws-cdk-lib/aws-lambda'
4import { Aws, Duration } from 'aws-cdk-lib'
5import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha'
6
7export class HpcBatchStack extends cdk.Stack {
8 constructor(scope: Construct, id: string, props?: cdk.StackProps) {
9 super(scope, id, props)
10
11 const startFunction = new PythonFunction(this, 'LambdaStart', {
12 entry: 'lib/lambda/01-start-function.py',
13 runtime: Runtime.PYTHON_3_9,
14 })
15
16 const computeFunction = new PythonFunction(this, 'LambdaMap-Compute', {
17 entry: 'lib/lambda/02-compute-map.py',
18 runtime: Runtime.PYTHON_3_9,
19 timeout: Duration.minutes(15),
20 memorySize: 4092,
21 })
22
23 const sumFunction = new PythonFunction(this, 'LambdaSum', {
24 entry: 'lib/lambda/03-sum.py',
25 runtime: Runtime.PYTHON_3_9,
26 timeout: Duration.minutes(15),
27 memorySize: 4092,
28 })
29
30 const bucket = new Bucket(this, 'Bucket', {
31 removalPolicy: cdk.RemovalPolicy.DESTROY,
32 })
33
34 const dummyMap = new Map(this, 'DummyMap')
35
36 dummyMap.iterator(
37 new LambdaInvoke(this, 'LambdaMap-Compute-Invoke', {
38 lambdaFunction: computeFunction,
39 }).addRetry({
40 errors: ['Lambda.TooManyRequestsException'],
41 maxAttempts: 10,
42 })
43 )
44
45 const distributedMap = new CustomState(this, 'DistributedMap', {
46 stateJson: {
47 Type: 'Map',
48 ItemsPath: '$.Payload.items',
49 ItemSelector: {
50 'index.$': '$$.Map.Item.Value',
51 'size.$': '$.Payload.size',
52 },
53 ItemProcessor: {
54 ...(dummyMap.toStateJson() as any).Iterator,
55 ProcessorConfig: {
56 Mode: 'DISTRIBUTED',
57 ExecutionType: 'STANDARD',
58 },
59 },
60 ResultWriter: {
61 Resource: 'arn:aws:states:::s3:putObject',
62 Parameters: {
63 Bucket: bucket.bucketName,
64 Prefix: 'process_output',
65 },
66 },
67 ResultPath: '$.results',
68 },
69 })
70 }
71}

With the map created we can now create the step function and build our state machine definition. The start and sum functions must be wrapped in a step function stage which can be added to the state machine defintion.

The last part is to add the proper permissions, the custom distributed map doesn't add the required permissions to the state machine automatically so we need to define these ourselves as well as permissions necessary for the step function to access the results bucket.

1import * as cdk from 'aws-cdk-lib'
2import { Construct } from 'constructs'
3import { Runtime } from 'aws-cdk-lib/aws-lambda'
4import { Aws, Duration } from 'aws-cdk-lib'
5import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha'
6
7export class HpcBatchStack extends cdk.Stack {
8 constructor(scope: Construct, id: string, props?: cdk.StackProps) {
9 super(scope, id, props)
10
11 const startFunction = new PythonFunction(this, 'LambdaStart', {
12 entry: 'lib/lambda/01-start-function.py',
13 runtime: Runtime.PYTHON_3_9,
14 })
15
16 const computeFunction = new PythonFunction(this, 'LambdaMap-Compute', {
17 entry: 'lib/lambda/02-compute-map.py',
18 runtime: Runtime.PYTHON_3_9,
19 timeout: Duration.minutes(15),
20 memorySize: 4092,
21 })
22
23 const sumFunction = new PythonFunction(this, 'LambdaSum', {
24 entry: 'lib/lambda/03-sum.py',
25 runtime: Runtime.PYTHON_3_9,
26 timeout: Duration.minutes(15),
27 memorySize: 4092,
28 })
29
30 const bucket = new Bucket(this, 'Bucket', {
31 removalPolicy: cdk.RemovalPolicy.DESTROY,
32 })
33
34 const dummyMap = new Map(this, 'DummyMap')
35
36 dummyMap.iterator(
37 new LambdaInvoke(this, 'LambdaMap-Compute-Invoke', {
38 lambdaFunction: computeFunction,
39 }).addRetry({
40 errors: ['Lambda.TooManyRequestsException'],
41 maxAttempts: 10,
42 })
43 )
44
45 const distributedMap = new CustomState(this, 'DistributedMap', {
46 stateJson: {
47 Type: 'Map',
48 ItemsPath: '$.Payload.items',
49 ItemSelector: {
50 'index.$': '$$.Map.Item.Value',
51 'size.$': '$.Payload.size',
52 },
53 ItemProcessor: {
54 ...(dummyMap.toStateJson() as any).Iterator,
55 ProcessorConfig: {
56 Mode: 'DISTRIBUTED',
57 ExecutionType: 'STANDARD',
58 },
59 },
60 ResultWriter: {
61 Resource: 'arn:aws:states:::s3:putObject',
62 Parameters: {
63 Bucket: bucket.bucketName,
64 Prefix: 'process_output',
65 },
66 },
67 ResultPath: '$.results',
68 },
69 })
70
71 const start = new LambdaInvoke(this, "LambdaStart-Invoke", {
72 lambdaFunction: startFunction
73 });
74
75 const sum = new LambdaInvoke(this, "LambdaSum-Invoke", {
76 lambdaFunction: sumFunction,
77 })
78
79 const sm = new StateMachine(this, "StateMachine", {
80 definition: start.next(distributedMap).next(sum)
81 stateMachineName: "python-compute",
82 });
83
84 sm.addToRolePolicy(
85 new PolicyStatement({
86 actions: ["states:StartExecution"],
87 effect: Effect.ALLOW,
88 resources: [
89 `arn:aws:states:${Aws.REGION}:${Aws.ACCOUNT_ID}:stateMachine:python-compute`,
90 ],
91 })
92 );
93 sm.addToRolePolicy(
94 new PolicyStatement({
95 actions: ["states:DescribeExecution", "states:StopExecution"],
96 effect: Effect.ALLOW,
97 resources: [
98 `arn:aws:states:${Aws.REGION}:${Aws.ACCOUNT_ID}:execution:python-compute/*`,
99 ],
100 })
101 );
102
103 computeFunction.grantInvoke(sm);
104 bucket.grantRead(sumFunction);
105 bucket.grantReadWrite(sm);
106 }
107}

With our stack defined we can now deploy the function into our AWS environment.

cdk deploy

Once the stack is finished deploying we can go ahead and run the step function. You can either run the step function from the console or via the command line using the following command.

aws stepfunctions start-execution --state-machine-arn ${STEP_FUNCTION_ARN} --input "input": '{"size" : 1024}'

We can view the progress of the invoked function inside the AWS console.

Step Function Graph
Step Function Graph
For the size of 1024 the output should be:
1677456018789067663671372738255670764586666597686358566342193756477221410140499187343423753590190199654903230120368437249793290025759621529630404421136354625785122473093734810631770853046956870724370537532675944320297140991286636242962828509522896731720351422709507309742403772433831515763569061347982793728937917572080593807721678191976699977719674314250327087197536114392694712320764190601504092578306695063185108049423264161086379202774496327833437273803923931031146560554703801730621556487894072813073555790385645230239490974885519293037996402872187477959587210528743364567470575017299075123610474254218581639139104495239118605213296266198809443485041387863601799770891713528730240629435323155298429395281875648512786576807162092119814931502875085084053120636196674089105614977016355729804210924608673697818048871566969677306244896959001190445265195146904163239058721549521685181751437989030383445074683101235320605991758526549304184555629694108856860688911930317353959983356785041251114914717024022061684408419230813821921008835782963210944208406546057433847471866336248211674420644471918730631408790090222462790300760269382594886932239749863220241471291953258601231882750859903013757817584299021416432641436444382222359068666916168934681341555268963284874563490553001754860053401319713961265977718481730637323292265201562353086893068836132440599823988341463937485970223586574517384494221296819341927010698101325081516671698257916708204402700857108121417108184267871187379654072523285584343498661530865426494266260393034825303813970691859316212576594619119273015999224700053507537306423576154791309073554143586292970890756741161143878983423072447447689108544561968121447046968479504816435757663122399681481908096316621059435250221476031858134465163710664860997270491585523489463905062212003240536028402046972514849386620166477867982660799873745859927434956962950275364099456632182513680425997526761377321358397170747982796000087621659099222272458323271879496411938826213746019232238288919940233544388200088733750022480376838699963245142458831844654215168317429459308500884007902267122773772195044468289767131310310944795580971908504521520668188543150697167889962707563447902944015868983214339608429457

Which is a pretty big number. With each function only computing an individual size the total time to invoke took 10 minutes, this was mostly due to the unoptimised python code I have written but if you choose to refactor the example I have here you can get vastly faster speeds.

So there you have it, a simple pattern to run HPC workloads using step functions. I've used this template problem to do lots of different things like copy a large amount of files from S3 into EFS. There are plenty of use cases for this design so feel free to tweet (or toot) if you end up using this!

Robert Koch Avatar

👋 I'm Robert, a Software Engineer from Melbourne, Australia. I write about a bunch of different topics including technology, science, business, and maths.

Like what you see?

Find out when I sporadically scream into the void...

Privacy respected. Unsubscribe at anytime.