AWS Lambda is the definitive serverless compute service in 2026. Run code in response to events — HTTP requests, database changes, file uploads, scheduled tasks — without managing servers. This guide covers Lambda from first function to production-ready serverless architecture with Python and Node.js.
📋 Table of Contents
Why Serverless Lambda?
- No server management — AWS handles scaling, patching, availability
- Pay per use — billed per 1ms of execution (first 1M requests free/month)
- Infinite scaling — auto-scales from 0 to 10,000 concurrent executions
- Event-driven — triggers from API Gateway, S3, DynamoDB, SQS, EventBridge
- Cold starts — sub-second initialization for Python/Node.js with SnapStart
Hello World Lambda (Python)
# lambda_function.py
import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event: dict, context) -> dict:
logger.info(f"Event: {json.dumps(event)}")
# context has: function_name, remaining_time_in_millis, memory_limit_in_mb
logger.info(f"Remaining time: {context.get_remaining_time_in_millis()}ms")
name = event.get("queryStringParameters", {}).get("name", "World")
return {
"statusCode": 200,
"headers": {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
},
"body": json.dumps({
"message": f"Hello, {name}!",
"requestId": context.aws_request_id,
})
}
Deploy with AWS SAM
# Install AWS SAM CLI
pip install aws-sam-cli
# Initialize project
sam init --runtime python3.12 --app-template hello-world
# Build
sam build
# Local test
sam local invoke HelloWorldFunction --event events/event.json
sam local start-api # runs locally on port 3000
# Deploy to AWS
sam deploy --guided # first time (interactive)
sam deploy # subsequent deploys
# template.yaml (SAM template)
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Globals:
Function:
Runtime: python3.12
Timeout: 30
MemorySize: 256
Environment:
Variables:
ENVIRONMENT: !Ref Environment
Layers:
- !Ref DependenciesLayer
Parameters:
Environment:
Type: String
AllowedValues: [dev, staging, production]
Resources:
ApiFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/
Handler: api.handler
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref UsersTable
- S3ReadPolicy:
BucketName: !Ref AssetsBucket
Events:
Api:
Type: Api
Properties:
Path: /{proxy+}
Method: ANY
Schedule:
Type: Schedule
Properties:
Schedule: rate(5 minutes)
DependenciesLayer:
Type: AWS::Serverless::LayerVersion
Properties:
ContentUri: dependencies/
CompatibleRuntimes: [python3.12]
UsersTable:
Type: AWS::DynamoDB::Table
Properties:
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: pk
AttributeType: S
- AttributeName: sk
AttributeType: S
KeySchema:
- AttributeName: pk
KeyType: HASH
- AttributeName: sk
KeyType: RANGE
FastAPI on Lambda (Mangum)
pip install fastapi mangum
# Package: put in layer or zip with requirements.txt
# main.py
from fastapi import FastAPI, Depends, HTTPException
from mangum import Mangum
from pydantic import BaseModel
app = FastAPI(
title="My API",
root_path="/production" # API Gateway stage
)
class User(BaseModel):
id: int
name: str
email: str
@app.get("/users/{user_id}")
async def get_user(user_id: int) -> User:
user = await db.get_user(user_id)
if not user:
raise HTTPException(404, "User not found")
return user
@app.post("/users")
async def create_user(user: User) -> User:
return await db.create_user(user)
# Lambda handler
handler = Mangum(app, lifespan="off")
Event Sources
# S3 trigger — process uploaded files
def handle_s3_upload(event: dict, context) -> None:
for record in event.get('Records', []):
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
size = record['s3']['object']['size']
print(f"Processing {key} ({size} bytes) from {bucket}")
process_file(bucket, key)
# SQS trigger — process messages
def handle_sqs_messages(event: dict, context) -> dict:
batch_failures = []
for record in event.get('Records', []):
try:
body = json.loads(record['body'])
process_message(body)
except Exception as e:
# Return failed message IDs for retry
batch_failures.append({"itemIdentifier": record['messageId']})
return {"batchItemFailures": batch_failures}
# DynamoDB Streams trigger
def handle_db_changes(event: dict, context) -> None:
for record in event.get('Records', []):
if record['eventName'] == 'INSERT':
new_item = record['dynamodb']['NewImage']
on_user_created(new_item)
elif record['eventName'] == 'MODIFY':
old = record['dynamodb']['OldImage']
new = record['dynamodb']['NewImage']
on_user_updated(old, new)
# EventBridge scheduled task
def run_nightly_report(event: dict, context) -> None:
print(f"Running nightly report at {event['time']}")
generate_and_email_report()
Lambda Best Practices
- Keep functions focused — one function, one purpose
- Reuse connections — initialize DB/Redis connections outside handler (they persist across warm invocations)
- Use Lambda Layers — share dependencies across functions, reduces deploy package size
- Set appropriate memory — more memory = more CPU, often faster+cheaper (test 128-3008MB)
- Enable SnapStart (Java) / use Graviton2 (better price-perf)
- Monitor cold starts — use X-Ray tracing, keep functions warm for critical paths
- Idempotent functions — Lambda can invoke multiple times (at-least-once), design accordingly
AWS Lambda in 2026 is the most cost-effective way to run event-driven, API, and scheduled workloads. Use SAM for local development and deployment, FastAPI+Mangum for REST APIs, and configure the right memory for your workload. For consistently warm APIs, consider Lambda with Provisioned Concurrency or Fargate for always-on services.
📚 You might also like
🔗 Share this article




✍️ Leave a Comment