Written by Robert Koch
Let's have a look at what we're deploying with this project. For this project I needed to evaluate a number of large equations, I then needed to sum all of the results together. You can have a look at the previous article to get the details about what this is calculating but to quickly get you up to speed we're calculating the total number of valid S3 key values that are valid unicode character combinations. The architecture diagram below shows how this project will compute the result.
As you can see the design of what we're going to build is very simple. There are three lambda functions which make up the compute component of our cluster.
cdk init app --language typescript
lib
directory will be a *-stack.ts
file. Here is where we will
define new constructs for our stack. To start with lets create constructs for
our three functions.import * as cdk from 'aws-cdk-lib'import { Construct } from 'constructs'import { Runtime } from 'aws-cdk-lib/aws-lambda'import { Aws, Duration } from 'aws-cdk-lib'import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha'export class HpcBatchStack extends cdk.Stack {constructor(scope: Construct, id: string, props?: cdk.StackProps) {super(scope, id, props)const startFunction = new PythonFunction(this, 'LambdaStart', {entry: 'lib/lambda/01-start-function.py',runtime: Runtime.PYTHON_3_9,})const computeFunction = new PythonFunction(this, 'LambdaMap-Compute', {entry: 'lib/lambda/02-compute-map.py',runtime: Runtime.PYTHON_3_9,timeout: Duration.minutes(15),memorySize: 4092,})const sumFunction = new PythonFunction(this, 'LambdaSum', {entry: 'lib/lambda/03-sum.py',runtime: Runtime.PYTHON_3_9,timeout: Duration.minutes(15),memorySize: 4092,})}}
PythonFunction
is a special construct that will bundle python lambda functions
as part of the CDK deployment. The code for each function is located in the
following files. You can see what they're doing below.size
and
returns an array of length size
. This will be used in the compute function to
specify how large the map will be.1def handler(event, context):2 event.update({"items": [v for v in range(0, event["size"])]})3 return event
The compute function is the main part of this pattern. This is where all the calculations for the process are done in parallel. You can modify what this lambda does to change the type of calculation done.
1import math23def multinomial(*vals):4 result = 15 for i in vals:6 result *= math.factorial(i)7 return math.factorial(sum(vals)) // result8910def handler(event, context):11 result = 01213 i = int(event["index"])14 for j in range(0, event["size"] // 2+1):15 if 2*j > i:16 break1718 # @show i, j19 for k in range(0, event["size"] // 3+1):20 if 2*j + 3*k > i:21 break2223 for l in range(0, event["size"] // 4+1):24 if 2*j + 3*k + 4*l > i:25 break2627 result += multinomial(i - 2*j - 3*k - 4*l, j, k, l) * 128 ** (28 i - 2*j - 3*k - 4*l) * 1863 ** (j) * 42451 ** (k) * 78341 ** (l)2930 event.update({"result": result})31 return event
The last part of the step function is to sum all the results from the compute step. Since this step function is based on a distributed map the results are stored in an S3 bucket (more on this later). To read the results we just download the file and then return the sum of the results.
1import boto32import json34s3 = boto3.client("s3")56def handler(event, context):7 s = 08 results = event["results"]9 bucket = results["ResultWriterDetails"]["Bucket"]10 key = results["ResultWriterDetails"]["Key"]1112 data = s3.get_object(Bucket=bucket, Key=key)13 manifest = json.loads(data["Body"].read().decode("utf-8"))1415 succeeded = manifest["ResultFiles"]["SUCCEEDED"]16 for obj in succeeded:17 data = s3.get_object(Bucket=bucket, Key=obj["Key"])18 successful_results = json.loads(data["Body"].read().decode("utf-8"))19 # print(result)20 for result in successful_results:21 payload = json.loads(result["Output"])22 s += int(payload["Payload"]["result"])2324 event.update({"total_sum": s})25 return event
DistributedMap
is defined in a stateJson
object
instead of a strictly typed object.1import * as cdk from 'aws-cdk-lib'2import { Construct } from 'constructs'3import { Runtime } from 'aws-cdk-lib/aws-lambda'4import { Aws, Duration } from 'aws-cdk-lib'5import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha'67export class HpcBatchStack extends cdk.Stack {8 constructor(scope: Construct, id: string, props?: cdk.StackProps) {9 super(scope, id, props)1011 const startFunction = new PythonFunction(this, 'LambdaStart', {12 entry: 'lib/lambda/01-start-function.py',13 runtime: Runtime.PYTHON_3_9,14 })1516 const computeFunction = new PythonFunction(this, 'LambdaMap-Compute', {17 entry: 'lib/lambda/02-compute-map.py',18 runtime: Runtime.PYTHON_3_9,19 timeout: Duration.minutes(15),20 memorySize: 4092,21 })2223 const sumFunction = new PythonFunction(this, 'LambdaSum', {24 entry: 'lib/lambda/03-sum.py',25 runtime: Runtime.PYTHON_3_9,26 timeout: Duration.minutes(15),27 memorySize: 4092,28 })2930 const bucket = new Bucket(this, 'Bucket', {31 removalPolicy: cdk.RemovalPolicy.DESTROY,32 })3334 const dummyMap = new Map(this, 'DummyMap')3536 dummyMap.iterator(37 new LambdaInvoke(this, 'LambdaMap-Compute-Invoke', {38 lambdaFunction: computeFunction,39 }).addRetry({40 errors: ['Lambda.TooManyRequestsException'],41 maxAttempts: 10,42 })43 )4445 const distributedMap = new CustomState(this, 'DistributedMap', {46 stateJson: {47 Type: 'Map',48 ItemsPath: '$.Payload.items',49 ItemSelector: {50 'index.$': '$$.Map.Item.Value',51 'size.$': '$.Payload.size',52 },53 ItemProcessor: {54 ...(dummyMap.toStateJson() as any).Iterator,55 ProcessorConfig: {56 Mode: 'DISTRIBUTED',57 ExecutionType: 'STANDARD',58 },59 },60 ResultWriter: {61 Resource: 'arn:aws:states:::s3:putObject',62 Parameters: {63 Bucket: bucket.bucketName,64 Prefix: 'process_output',65 },66 },67 ResultPath: '$.results',68 },69 })70 }71}
With the map created we can now create the step function and build our state machine definition. The start and sum functions must be wrapped in a step function stage which can be added to the state machine defintion.
The last part is to add the proper permissions, the custom distributed map doesn't add the required permissions to the state machine automatically so we need to define these ourselves as well as permissions necessary for the step function to access the results bucket.
1import * as cdk from 'aws-cdk-lib'2import { Construct } from 'constructs'3import { Runtime } from 'aws-cdk-lib/aws-lambda'4import { Aws, Duration } from 'aws-cdk-lib'5import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha'67export class HpcBatchStack extends cdk.Stack {8 constructor(scope: Construct, id: string, props?: cdk.StackProps) {9 super(scope, id, props)1011 const startFunction = new PythonFunction(this, 'LambdaStart', {12 entry: 'lib/lambda/01-start-function.py',13 runtime: Runtime.PYTHON_3_9,14 })1516 const computeFunction = new PythonFunction(this, 'LambdaMap-Compute', {17 entry: 'lib/lambda/02-compute-map.py',18 runtime: Runtime.PYTHON_3_9,19 timeout: Duration.minutes(15),20 memorySize: 4092,21 })2223 const sumFunction = new PythonFunction(this, 'LambdaSum', {24 entry: 'lib/lambda/03-sum.py',25 runtime: Runtime.PYTHON_3_9,26 timeout: Duration.minutes(15),27 memorySize: 4092,28 })2930 const bucket = new Bucket(this, 'Bucket', {31 removalPolicy: cdk.RemovalPolicy.DESTROY,32 })3334 const dummyMap = new Map(this, 'DummyMap')3536 dummyMap.iterator(37 new LambdaInvoke(this, 'LambdaMap-Compute-Invoke', {38 lambdaFunction: computeFunction,39 }).addRetry({40 errors: ['Lambda.TooManyRequestsException'],41 maxAttempts: 10,42 })43 )4445 const distributedMap = new CustomState(this, 'DistributedMap', {46 stateJson: {47 Type: 'Map',48 ItemsPath: '$.Payload.items',49 ItemSelector: {50 'index.$': '$$.Map.Item.Value',51 'size.$': '$.Payload.size',52 },53 ItemProcessor: {54 ...(dummyMap.toStateJson() as any).Iterator,55 ProcessorConfig: {56 Mode: 'DISTRIBUTED',57 ExecutionType: 'STANDARD',58 },59 },60 ResultWriter: {61 Resource: 'arn:aws:states:::s3:putObject',62 Parameters: {63 Bucket: bucket.bucketName,64 Prefix: 'process_output',65 },66 },67 ResultPath: '$.results',68 },69 })7071 const start = new LambdaInvoke(this, "LambdaStart-Invoke", {72 lambdaFunction: startFunction73 });7475 const sum = new LambdaInvoke(this, "LambdaSum-Invoke", {76 lambdaFunction: sumFunction,77 })7879 const sm = new StateMachine(this, "StateMachine", {80 definition: start.next(distributedMap).next(sum)81 stateMachineName: "python-compute",82 });8384 sm.addToRolePolicy(85 new PolicyStatement({86 actions: ["states:StartExecution"],87 effect: Effect.ALLOW,88 resources: [89 `arn:aws:states:${Aws.REGION}:${Aws.ACCOUNT_ID}:stateMachine:python-compute`,90 ],91 })92 );93 sm.addToRolePolicy(94 new PolicyStatement({95 actions: ["states:DescribeExecution", "states:StopExecution"],96 effect: Effect.ALLOW,97 resources: [98 `arn:aws:states:${Aws.REGION}:${Aws.ACCOUNT_ID}:execution:python-compute/*`,99 ],100 })101 );102103 computeFunction.grantInvoke(sm);104 bucket.grantRead(sumFunction);105 bucket.grantReadWrite(sm);106 }107}
With our stack defined we can now deploy the function into our AWS environment.
cdk deploy
Once the stack is finished deploying we can go ahead and run the step function. You can either run the step function from the console or via the command line using the following command.
aws stepfunctions start-execution --state-machine-arn ${STEP_FUNCTION_ARN} --input "input": '{"size" : 1024}'
We can view the progress of the invoked function inside the AWS console.
1024
the output should be:1677456018789067663671372738255670764586666597686358566342193756477221410140499187343423753590190199654903230120368437249793290025759621529630404421136354625785122473093734810631770853046956870724370537532675944320297140991286636242962828509522896731720351422709507309742403772433831515763569061347982793728937917572080593807721678191976699977719674314250327087197536114392694712320764190601504092578306695063185108049423264161086379202774496327833437273803923931031146560554703801730621556487894072813073555790385645230239490974885519293037996402872187477959587210528743364567470575017299075123610474254218581639139104495239118605213296266198809443485041387863601799770891713528730240629435323155298429395281875648512786576807162092119814931502875085084053120636196674089105614977016355729804210924608673697818048871566969677306244896959001190445265195146904163239058721549521685181751437989030383445074683101235320605991758526549304184555629694108856860688911930317353959983356785041251114914717024022061684408419230813821921008835782963210944208406546057433847471866336248211674420644471918730631408790090222462790300760269382594886932239749863220241471291953258601231882750859903013757817584299021416432641436444382222359068666916168934681341555268963284874563490553001754860053401319713961265977718481730637323292265201562353086893068836132440599823988341463937485970223586574517384494221296819341927010698101325081516671698257916708204402700857108121417108184267871187379654072523285584343498661530865426494266260393034825303813970691859316212576594619119273015999224700053507537306423576154791309073554143586292970890756741161143878983423072447447689108544561968121447046968479504816435757663122399681481908096316621059435250221476031858134465163710664860997270491585523489463905062212003240536028402046972514849386620166477867982660799873745859927434956962950275364099456632182513680425997526761377321358397170747982796000087621659099222272458323271879496411938826213746019232238288919940233544388200088733750022480376838699963245142458831844654215168317429459308500884007902267122773772195044468289767131310310944795580971908504521520668188543150697167889962707563447902944015868983214339608429457
Which is a pretty big number. With each function only computing an individual size the total time to invoke took 10 minutes, this was mostly due to the unoptimised python code I have written but if you choose to refactor the example I have here you can get vastly faster speeds.
So there you have it, a simple pattern to run HPC workloads using step functions. I've used this template problem to do lots of different things like copy a large amount of files from S3 into EFS. There are plenty of use cases for this design so feel free to tweet (or toot) if you end up using this!