AWS लैम्ब्डा 2026 में निश्चित सर्वर रहित कंप्यूट सेवा है। घटनाओं के जवाब में कोड चलाएँ – HTTP अनुरोध, डेटाबेस परिवर्तन, फ़ाइल अपलोड, निर्धारित कार्य – सर्वर को प्रबंधित किए बिना। यह मार्गदर्शिका लैम्ब्डा को पहले फ़ंक्शन से लेकर पायथन और नोड.जेएस के साथ उत्पादन-तैयार सर्वर रहित आर्किटेक्चर तक कवर करती है।
📋 Table of Contents
सर्वर रहित लैम्ब्डा क्यों?
- कोई सर्वर प्रबंधन नहीं– AWS स्केलिंग, पैचिंग, उपलब्धता को संभालता है
- प्रति उपयोग भुगतान करें– निष्पादन के प्रति 1 एमएस बिल भेजा गया (पहले 1 मिलियन अनुरोध निःशुल्क/माह)
- अनंत स्केलिंग– 0 से 10,000 समवर्ती निष्पादन तक ऑटो-स्केल
- घटना संचालित की गई– एपीआई गेटवे, एस3, डायनेमोडीबी, एसक्यूएस, इवेंटब्रिज से ट्रिगर
- ठंड शुरू– SnapStart के साथ Python/Node.js के लिए उप-सेकेंड आरंभीकरण
हैलो वर्ल्ड लैम्ब्डा (पायथन)
# lambda_function.py
import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event: dict, context) -> dict:
logger.info(f"Event: {json.dumps(event)}")
# context has: function_name, remaining_time_in_millis, memory_limit_in_mb
logger.info(f"Remaining time: {context.get_remaining_time_in_millis()}ms")
name = event.get("queryStringParameters", {}).get("name", "World")
return {
"statusCode": 200,
"headers": {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
},
"body": json.dumps({
"message": f"Hello, {name}!",
"requestId": context.aws_request_id,
})
}
AWS SAM के साथ परिनियोजन करें
# Install AWS SAM CLI
pip install aws-sam-cli
# Initialize project
sam init --runtime python3.12 --app-template hello-world
# Build
sam build
# Local test
sam local invoke HelloWorldFunction --event events/event.json
sam local start-api # runs locally on port 3000
# Deploy to AWS
sam deploy --guided # first time (interactive)
sam deploy # subsequent deploys
# template.yaml (SAM template)
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Globals:
Function:
Runtime: python3.12
Timeout: 30
MemorySize: 256
Environment:
Variables:
ENVIRONMENT: !Ref Environment
Layers:
- !Ref DependenciesLayer
Parameters:
Environment:
Type: String
AllowedValues: [dev, staging, production]
Resources:
ApiFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/
Handler: api.handler
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref UsersTable
- S3ReadPolicy:
BucketName: !Ref AssetsBucket
Events:
Api:
Type: Api
Properties:
Path: /{proxy+}
Method: ANY
Schedule:
Type: Schedule
Properties:
Schedule: rate(5 minutes)
DependenciesLayer:
Type: AWS::Serverless::LayerVersion
Properties:
ContentUri: dependencies/
CompatibleRuntimes: [python3.12]
UsersTable:
Type: AWS::DynamoDB::Table
Properties:
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: pk
AttributeType: S
- AttributeName: sk
AttributeType: S
KeySchema:
- AttributeName: pk
KeyType: HASH
- AttributeName: sk
KeyType: RANGE
लैम्ब्डा (मैंगम) पर फास्टएपीआई
pip install fastapi mangum
# Package: put in layer or zip with requirements.txt
# main.py
from fastapi import FastAPI, Depends, HTTPException
from mangum import Mangum
from pydantic import BaseModel
app = FastAPI(
title="My API",
root_path="/production" # API Gateway stage
)
class User(BaseModel):
id: int
name: str
email: str
@app.get("/users/{user_id}")
async def get_user(user_id: int) -> User:
user = await db.get_user(user_id)
if not user:
raise HTTPException(404, "User not found")
return user
@app.post("/users")
async def create_user(user: User) -> User:
return await db.create_user(user)
# Lambda handler
handler = Mangum(app, lifespan="off")
घटना स्रोत
# S3 trigger — process uploaded files
def handle_s3_upload(event: dict, context) -> None:
for record in event.get('Records', []):
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
size = record['s3']['object']['size']
print(f"Processing {key} ({size} bytes) from {bucket}")
process_file(bucket, key)
# SQS trigger — process messages
def handle_sqs_messages(event: dict, context) -> dict:
batch_failures = []
for record in event.get('Records', []):
try:
body = json.loads(record['body'])
process_message(body)
except Exception as e:
# Return failed message IDs for retry
batch_failures.append({"itemIdentifier": record['messageId']})
return {"batchItemFailures": batch_failures}
# DynamoDB Streams trigger
def handle_db_changes(event: dict, context) -> None:
for record in event.get('Records', []):
if record['eventName'] == 'INSERT':
new_item = record['dynamodb']['NewImage']
on_user_created(new_item)
elif record['eventName'] == 'MODIFY':
old = record['dynamodb']['OldImage']
new = record['dynamodb']['NewImage']
on_user_updated(old, new)
# EventBridge scheduled task
def run_nightly_report(event: dict, context) -> None:
print(f"Running nightly report at {event['time']}")
generate_and_email_report()
लैम्ब्डा सर्वोत्तम अभ्यास
- कार्यों को केंद्रित रखें– एक कार्य, एक उद्देश्य
- कनेक्शन का पुन: उपयोग करें– हैंडलर के बाहर डीबी/रेडिस कनेक्शन प्रारंभ करें (वे गर्म आमंत्रणों में बने रहते हैं)
- लैम्ब्डा परतों का प्रयोग करें– विभिन्न कार्यों में निर्भरताएँ साझा करें, परिनियोजन पैकेज का आकार कम करें
- उपयुक्त मेमोरी सेट करें— अधिक मेमोरी = अधिक सीपीयू, अक्सर तेज़+सस्ता (परीक्षण 128-3008एमबी)
- स्नैपस्टार्ट सक्षम करें(जावा) /Graviton2 का उपयोग करें(बेहतर कीमत-पूर्ण)
- ठंड की शुरूआत पर नजर रखें– एक्स-रे ट्रेसिंग का उपयोग करें, महत्वपूर्ण पथों के लिए फ़ंक्शन को गर्म रखें
- निष्क्रिय कार्य– लैम्ब्डा कई बार (कम से कम एक बार) इनवॉइस कर सकता है, तदनुसार डिज़ाइन कर सकता है
2026 में AWS लैम्ब्डा इवेंट-संचालित, एपीआई और शेड्यूल किए गए वर्कलोड को चलाने का सबसे लागत प्रभावी तरीका है। स्थानीय विकास और परिनियोजन के लिए SAM का उपयोग करें, REST API के लिए FastAPI+Mangum का उपयोग करें, और अपने कार्यभार के लिए सही मेमोरी को कॉन्फ़िगर करें। लगातार गर्म एपीआई के लिए, हमेशा चालू सेवाओं के लिए प्रोविजन्ड कॉनकरेंसी या फार्गेट के साथ लैम्ब्डा पर विचार करें।
🔗 Share this article
✍️ Leave a Comment