Worker Functions
With Lambda, CloudWatch Events, and S3
Lambda isn’t just for HTTP functions. Another application of a Go Lambda function is one that we will invoke manually or automatically to do some work. To accomplish this we need something to work against such as an S3 bucket, and the the Lambda Invoke API or CloudWatch Events to trigger our worker.
Note: This is part of a series about writing and Go Functions-as-a-Service on AWS Lambda and related services like API Gateway, S3 and X-Ray.
If you’d like to experiment with these techniques yourself, check out https://github.com/nzoschke/gofaas for a boilerplate app with all the pieces configured correctly and explained in depth.
Go Code
First, we write a function that will do some work then save a report to S3. We anticipate invoking this manually with custom data like a requestor and a start time, so we define a custom event.
import "github.com/aws/aws-sdk-go/service/s3"
var S3 = s3.New(session.Must(session.NewSession()))
type WorkerEvent struct {
SourceIP string `json:"source_ip"`
TimeEnd time.Time `json:"time_end"`
TimeStart time.Time `json:"time_start"`
}
func Worker(ctx context.Context, e WorkerEvent) error {
// perform work here
e.TimeEnd = time.Now()
b, err := json.Marshal(e)
if err != nil {
return errors.WithStack(err)
}
_, err = S3.PutObjectWithContext(ctx, &s3.PutObjectInput{
Body: bytes.NewReader(b),
Bucket: aws.String(os.Getenv("BUCKET")),
Key: aws.String(uuid.NewV4().String()),
})
return errors.WithStack(err)
}
From worker.go
This function simply returns an error to tell Lambda if it was successful or not.
AWS Config
Next we add the config for our function and the S3 bucket it uses:
Resources:
Bucket:
Type: AWS::S3::Bucket
WorkerFunction:
Properties:
CodeUri: ./handlers/worker/main.zip
Environment:
Variables:
BUCKET: !Ref Bucket
Handler: main
Policies:
- S3CrudPolicy:
BucketName: !Ref Bucket
Runtime: go1.x
Timeout: 15
Type: AWS::Serverless::Function
From template.yml
Note the longer timeout (15s versus default 3s) we give the worker in case it needs it. Also note the environment variable and policy for the bucket. When we deploy this, AWS will set up the bucket and permissions before creating the Lambda function.
Go Code
Next, we write another function that cleans up the bucket. This will be called automatically by AWS so it takes a CloudWatch event.
import "github.com/aws/aws-lambda-go/events"
func WorkerPeriodic(ctx context.Context, e events.CloudWatchEvent) error {
iter := s3manager.NewDeleteListIterator(S3, &s3.ListObjectsInput{
Bucket: aws.String(os.Getenv("BUCKET")),
})
err := s3manager.NewBatchDeleteWithClient(S3).Delete(ctx, iter)
return errors.WithStack(err)
}
From worker.go
AWS Config
Next we add the config for our function and the S3 bucket it uses:
Resources:
WorkerPeriodicFunction:
Properties:
CodeUri: ./handlers/worker-periodic/main.zip
Environment:
Variables:
BUCKET: !Ref Bucket
Events:
Request:
Properties:
Schedule: rate(1 day)
Type: Schedule
Handler: main
Policies:
- Statement:
- Action:
- s3:DeleteObject
Effect: Allow
Resource: !Sub "arn:aws:s3:::${Bucket}/*"
- Action:
- s3:ListBucket
Effect: Allow
Resource: !Sub "arn:aws:s3:::${Bucket}"
Runtime: go1.x
Timeout: 15
Type: AWS::Serverless::Function
From template.yml
Note the rate(1 day)
ScheduleExpression. We could make this more frequent with rate(1 minute)
or more specific with cron(0 12 * * ? *)
(every day at 12). When we deploy this AWS will automatically invoke our function on this schedule. See the CloudWatch Schedule Expressions guide for more details.
Also note the specific policy. At the time of writing, the simpler S3CrudPolicy
doesn’t actually add a delete permission, so we take matters into our own hands. We aim for the least privilege, so we give our function a single action on the bucket (list), and a single action on its contents (delete). For further reading check out the per-function policies doc.
Package and Deploy
We need to make the boilerplate worker
and worker-periodic
Go programs that Lambda will invoke. Check out the the dev, package, deploy doc for more details.
From here we can assume we have these programs and a single command to deploy:
$ make deploy
cd ./handlers/worker && GOOS=linux go build...
cd ./handlers/worker-periodic && GOOS=linux go build...
aws cloudformation package ...
aws cloudformation deploy ...
Finally we can invoke our function manually:
$ aws lambda invoke --function-name gofaas-WorkerFunction \
--payload '{"time_start": "2018-02-21T15:00:43.511Z"}' \
--log-type Tail --output text --query 'LogResult' out.log | base64 -D
START RequestId: 0bb47628-1718-11e8-ad73-c58e72b8826c Version: $LATEST
2018/02/21 15:01:07 Worker Event: {SourceIP: TimeEnd:0001-01-01 00:00:00 +0000 UTC TimeStart:2018-02-21 15:00:43.511 +0000 UTC}
END RequestId: 0bb47628-1718-11e8-ad73-c58e72b8826c
REPORT RequestId: 0bb47628-1718-11e8-ad73-c58e72b8826c Duration: 11.11 ms Billed Duration: 100 ms Memory Size: 128 MB Max Memory Used: 41 MB
And we can review logs to see our periodic function called once a day:
$ aws logs filter-log-events --log-group-name '/aws/lambda/gofaas-WorkerPeriodicFunction' --output text --query 'events[*].{Message:message}'
START RequestId: ae1b5451-1727-11e8-991a-85c308f12bbb Version: $LATEST
2018/02/23 03:01:43 WorkerPeriodic Event: ...
END RequestId: ae1b5451-1727-11e8-991a-85c308f12bbb
REPORT RequestId: ae1b5451-1727-11e8-991a-85c308f12bbb Duration: 1236.68 ms Billed Duration: 1300 ms Memory Size: 128 MB Max Memory Used: 45 MB
START RequestId: c96123d4-1727-11e8-b0e4-27c53f455614 Version: $LATEST
2018/02/24 03:01:41 WorkerPeriodic Event: ...
END RequestId: c96123d4-1727-11e8-b0e4-27c53f455614
REPORT RequestId: c96123d4-1727-11e8-b0e4-27c53f455614 Duration: 144.81 ms Billed Duration: 200 ms Memory Size: 128 MB Max Memory Used: 46 MB
Summary
When building worker functions we:
- Design custom events
- Configure scheduled events
- Add a storage service to our stack and policies for our functions
- Write Go funcs for the events that perform work
We no longer have to worry about:
- Work queues
- Worker pools
- Running a scheduler process or service
Lambda makes building workers significantly easier.