Security Patterns & Microservice Security
Overview
Security in microservices requires a multi-layered approach: authentication proves who you are, authorization proves what you can do, and data protection ensures information stays safe. Rather than bolting security on at the end, effective architectures embed security patterns throughout design.
This guide covers proven security patterns for microservices, showing when to use each and real-world trade-offs.
Caveat: Security patterns can add significant complexity. Use /pb-preamble thinking (challenge assumptions, surface trade-offs) and /pb-design-rules thinking (does this pattern serve Simplicity while maintaining Robustness?).
Question threat models. Challenge assumed attack surfaces. Surface the real risk vs. implementation cost trade-off. Don’t add complexity without understanding the actual risk.
Resource Hint: sonnet - Security pattern reference; implementation-level authentication and authorization decisions.
Authentication Patterns
Authentication answers: “Are you who you claim to be?”
Pattern 1: OAuth 2.0 with Authorization Code Flow
When to use: Third-party integrations, user-facing APIs, token-based access
How it works:
- User requests access to their data
- App redirects to authorization server
- User grants permission
- Authorization server returns authorization code
- App exchanges code for access token (backend-to-backend)
- App uses access token to call APIs
Python Example:
from requests_oauthlib import OAuth2Session
from flask import Flask, request, redirect, url_for
app = Flask(__name__)
client_id = "your-client-id"
client_secret = "your-client-secret"
authorization_base_url = "https://auth.example.com/authorize"
token_url = "https://auth.example.com/token"
@app.route("/login")
def login():
oauth = OAuth2Session(client_id, redirect_uri=url_for('callback', _external=True))
authorization_url, state = oauth.authorization_url(authorization_base_url)
session['oauth_state'] = state
return redirect(authorization_url)
@app.route("/callback")
def callback():
oauth = OAuth2Session(client_id, state=session['oauth_state'])
token = oauth.fetch_token(
token_url,
client_secret=client_secret,
authorization_response=request.url
)
session['oauth_token'] = token
return redirect(url_for('dashboard'))
@app.route("/api/user-data")
def get_user_data():
oauth = OAuth2Session(client_id, token=session['oauth_token'])
user_data = oauth.get("https://api.example.com/user").json()
return user_data
JavaScript Example:
// Frontend: Using OAuth 2.0 Authorization Code Flow with PKCE
const clientId = 'your-client-id';
const redirectUri = 'https://yourapp.com/callback';
const authorizationUrl = 'https://auth.example.com/authorize';
function generateCodeChallenge(codeVerifier) {
return btoa(String.fromCharCode.apply(null,
new Uint8Array(codeVerifier)
)).replace(/\+/g, '-').replace(/\//g, '_').replace(/=/g, '');
}
function loginWithOAuth() {
const codeVerifier = generateRandomString(128);
sessionStorage.setItem('code_verifier', codeVerifier);
const codeChallenge = generateCodeChallenge(codeVerifier);
const params = new URLSearchParams({
client_id: clientId,
response_type: 'code',
scope: 'openid profile email',
redirect_uri: redirectUri,
code_challenge: codeChallenge,
code_challenge_method: 'S256'
});
window.location.href = `${authorizationUrl}?${params}`;
}
// After redirect back to app
async function handleCallback(authCode) {
const codeVerifier = sessionStorage.getItem('code_verifier');
const response = await fetch('/api/token', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
grant_type: 'authorization_code',
code: authCode,
code_verifier: codeVerifier,
client_id: clientId
})
});
const { access_token } = await response.json();
localStorage.setItem('access_token', access_token);
}
Go: Use golang.org/x/oauth2 with go-oidc/v3/oidc for OIDC. Same flow: redirect to auth URL, handle callback, exchange code for token, verify ID token claims.
Trade-offs:
- ✅ Industry standard, well-supported
- ✅ Doesn’t expose user password to application
- ✅ Easy delegation to third-party identity providers
- ❌ More complex than basic authentication
- ❌ Requires redirect flow (not suitable for server-to-server)
Antipatterns:
- ❌ Storing authorization codes indefinitely
- ❌ Sending access tokens through unsecured channels
- ❌ Not validating state parameter (CSRF vulnerability)
- ❌ Storing user password instead of using OAuth
Pattern 2: JWT (JSON Web Tokens) for API Authentication
When to use: Stateless API authentication, microservice-to-microservice, mobile apps
How it works:
- Client authenticates with credentials
- Server creates JWT (Header.Payload.Signature)
- Client includes JWT in Authorization header for each request
- Server validates signature to verify authenticity
JWT Structure:
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.
eyJzdWIiOiJ1c2VyMTIzIiwiZW1haWwiOiJ1c2VyQGV4YW1wbGUuY29tIiwiaWF0IjoxNTE2MjM5MDIyfQ.
SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
Python Example:
import jwt
from datetime import datetime, timedelta
from flask import Flask, request, jsonify
app = Flask(__name__)
secret_key = "your-secret-key-keep-safe"
def create_jwt(user_id, email):
payload = {
'user_id': user_id,
'email': email,
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(hours=24)
}
token = jwt.encode(payload, secret_key, algorithm='HS256')
return token
def verify_jwt(token):
try:
payload = jwt.decode(token, secret_key, algorithms=['HS256'])
return payload
except jwt.ExpiredSignatureError:
return None # Token expired
except jwt.InvalidTokenError:
return None # Invalid token
@app.route('/login', methods=['POST'])
def login():
credentials = request.get_json()
# Verify username/password (simplified)
if verify_password(credentials['username'], credentials['password']):
user = get_user(credentials['username'])
token = create_jwt(user['id'], user['email'])
return jsonify({'access_token': token})
return jsonify({'error': 'Invalid credentials'}), 401
@app.before_request
def verify_token():
if request.path.startswith('/api/'):
auth_header = request.headers.get('Authorization')
if not auth_header:
return jsonify({'error': 'Missing token'}), 401
try:
token = auth_header.split(' ')[1] # "Bearer <token>"
payload = verify_jwt(token)
if not payload:
return jsonify({'error': 'Invalid token'}), 401
request.user_id = payload['user_id']
except:
return jsonify({'error': 'Invalid token'}), 401
@app.route('/api/user-profile')
def user_profile():
user = get_user_by_id(request.user_id)
return jsonify(user)
Go: Use github.com/golang-jwt/jwt/v5 with custom claims struct. Same pattern: create with jwt.NewWithClaims(), verify with jwt.ParseWithClaims(), middleware extracts claims to context.
Trade-offs:
- ✅ Stateless (no server session needed)
- ✅ Scalable across multiple servers
- ✅ Works well for APIs and microservices
- ❌ Token size larger than session cookies
- ❌ Can’t revoke tokens immediately (use token blacklists for logout)
Antipatterns:
- ❌ Storing sensitive data in JWT (it’s base64-encoded, not encrypted)
- ❌ Using weak secret keys
- ❌ Not validating expiration
- ❌ Storing JWT in local storage (use httpOnly cookies for web apps)
Pattern 3: mTLS (Mutual TLS) for Service-to-Service Authentication
When to use: Internal microservice communication, service mesh, high-security requirements
How it works:
- Both client and server present certificates
- Both verify each other’s certificates
- TLS handshake establishes encrypted connection
- Communication is authenticated and encrypted
Go Example (mTLS Server):
package main
import (
"crypto/tls"
"log"
"net/http"
)
func main() {
// Load server certificate and key
cert, err := tls.LoadX509KeyPair("server.crt", "server.key")
if err != nil {
log.Fatal(err)
}
// Load client CA certificate for verification
caCert, err := ioutil.ReadFile("client-ca.crt")
if err != nil {
log.Fatal(err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
// Configure TLS with client certificate verification
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
ClientCAs: caCertPool,
ClientAuth: tls.RequireAndVerifyClientCert,
MinVersion: tls.VersionTLS12,
}
server := &http.Server{
Addr: ":8443",
TLSConfig: tlsConfig,
}
http.HandleFunc("/api/data", func(w http.ResponseWriter, r *http.Request) {
// Client cert is verified by TLS layer
clientName := r.TLS.PeerCertificates[0].Subject.CommonName
log.Printf("Request from service: %s\n", clientName)
w.WriteHeader(http.StatusOK)
w.Write([]byte("Authenticated service data"))
})
log.Println("mTLS server listening on :8443")
log.Fatal(server.ListenAndServeTLS("", ""))
}
Go Example (mTLS Client):
func createMTLSClient(certFile, keyFile, caFile string) (*http.Client, error) {
// Load client certificate
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return nil, err
}
// Load server CA certificate
caCert, err := ioutil.ReadFile(caFile)
if err != nil {
return nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
// Configure TLS
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
MinVersion: tls.VersionTLS12,
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
}
return client, nil
}
// Usage
client, _ := createMTLSClient("client.crt", "client.key", "ca.crt")
resp, _ := client.Get("https://internal-service:8443/api/data")
Trade-offs:
- ✅ Strongest authentication (mutual verification)
- ✅ Encrypted in transit
- ✅ No shared secrets
- ❌ Certificate management overhead
- ❌ More complex to set up than API keys
- ❌ Performance cost of TLS handshake
Authorization Patterns
Authorization answers: “What are you allowed to do?”
Pattern 1: RBAC (Role-Based Access Control)
When to use: Most common authorization, clear role definitions
How it works: Users have roles, roles have permissions. Check if user’s role has required permission.
Python Example:
from enum import Enum
from functools import wraps
class Role(Enum):
ADMIN = "admin"
MANAGER = "manager"
USER = "user"
class Permission(Enum):
READ = "read"
WRITE = "write"
DELETE = "delete"
MANAGE_USERS = "manage_users"
ROLE_PERMISSIONS = {
Role.ADMIN: [Permission.READ, Permission.WRITE, Permission.DELETE, Permission.MANAGE_USERS],
Role.MANAGER: [Permission.READ, Permission.WRITE, Permission.DELETE],
Role.USER: [Permission.READ],
}
def require_permission(required_permission):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
user_role = get_current_user_role()
if required_permission not in ROLE_PERMISSIONS.get(user_role, []):
raise PermissionError(f"User role {user_role} lacks {required_permission}")
return func(*args, **kwargs)
return wrapper
return decorator
@app.route('/api/data', methods=['POST'])
@require_permission(Permission.WRITE)
def create_data():
# Only users with WRITE permission can access this
return jsonify({'created': True})
@app.route('/api/users/<user_id>', methods=['DELETE'])
@require_permission(Permission.MANAGE_USERS)
def delete_user(user_id):
# Only admins can delete users
return jsonify({'deleted': user_id})
Go Example:
type Role string
const (
RoleAdmin Role = "admin"
RoleManager Role = "manager"
RoleUser Role = "user"
)
type Permission string
const (
PermissionRead Permission = "read"
PermissionWrite Permission = "write"
PermissionDelete Permission = "delete"
PermissionManageUsers Permission = "manage_users"
)
var rolePermissions = map[Role][]Permission{
RoleAdmin: {PermissionRead, PermissionWrite, PermissionDelete, PermissionManageUsers},
RoleManager: {PermissionRead, PermissionWrite, PermissionDelete},
RoleUser: {PermissionRead},
}
func hasPermission(userRole Role, requiredPerm Permission) bool {
permissions := rolePermissions[userRole]
for _, p := range permissions {
if p == requiredPerm {
return true
}
}
return false
}
func requirePermission(perm Permission) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
userRole := getUserRole(r)
if !hasPermission(userRole, perm) {
http.Error(w, "Insufficient permissions", http.StatusForbidden)
return
}
next.ServeHTTP(w, r)
})
}
}
// Usage
mux.HandleFunc("/api/data", requirePermission(PermissionWrite)(createDataHandler))
mux.HandleFunc("/api/users/{id}", requirePermission(PermissionManageUsers)(deleteUserHandler))
Trade-offs:
- ✅ Simple and understandable
- ✅ Easy to implement
- ❌ Inflexible for fine-grained control
- ❌ Doesn’t account for context (time, location, resource)
Pattern 2: ABAC (Attribute-Based Access Control)
When to use: Fine-grained control, context-dependent access, complex business rules
How it works: Access decisions based on attributes of user, resource, action, and environment.
Python Example:
from dataclasses import dataclass
from typing import Dict, Any
@dataclass
class AccessContext:
user_id: int
user_dept: str
resource_owner: int
resource_type: str
resource_sensitivity: str
action: str
time_of_day: int
is_vpn: bool
def check_access(context: AccessContext) -> bool:
"""
Complex access control rules:
- Users can only read/write their own data
- Managers can read team data
- High-sensitivity resources only accessible during business hours on VPN
- Admins have unrestricted access
"""
rules = [
# Rule 1: Owner can always access their own data
lambda ctx: ctx.user_id == ctx.resource_owner,
# Rule 2: Managers can read team data
lambda ctx: (ctx.user_dept == "management" and
ctx.action == "read" and
ctx.resource_type == "team_data"),
# Rule 3: High-sensitivity only during business hours on VPN
lambda ctx: not (ctx.resource_sensitivity == "high" and
(ctx.time_of_day < 9 or ctx.time_of_day > 17 or not ctx.is_vpn)),
# Rule 4: Admins bypass all checks
lambda ctx: ctx.user_dept == "admin",
]
return any(rule(context) for rule in rules)
# Usage
context = AccessContext(
user_id=123,
user_dept="engineering",
resource_owner=123,
resource_type="personal_data",
resource_sensitivity="high",
action="read",
time_of_day=14,
is_vpn=True
)
if check_access(context):
return get_resource()
else:
raise PermissionError("Access denied")
Trade-offs:
- ✅ Highly flexible
- ✅ Handles complex business logic
- ❌ Hard to understand and maintain
- ❌ Performance overhead of evaluation
Secret Management Patterns
Pattern 1: Encrypted Secret Vault
When to use: Production applications, sensitive credentials (API keys, database passwords)
Go Example with HashiCorp Vault:
import "github.com/hashicorp/vault/api"
func getSecretFromVault(secretPath string) (string, error) {
config := api.DefaultConfig()
config.Address = "https://vault.example.com:8200"
client, err := api.NewClient(config)
if err != nil {
return "", err
}
// Authenticate with service token or approle
auth := client.Auth().Token()
secret, err := auth.RenewSelf(1, 3600)
if err != nil {
return "", err
}
// Read secret
secret, err = client.Logical().Read(secretPath)
if err != nil {
return "", err
}
// Extract value
dbPassword := secret.Data["data"].(map[string]interface{})["password"].(string)
return dbPassword, nil
}
// Usage
dbPassword, _ := getSecretFromVault("secret/database/prod")
db.Connect(dbPassword)
Trade-offs:
- ✅ Centralized secret management
- ✅ Audit trail of secret access
- ✅ Rotation without app restart
- ❌ Additional infrastructure
- ❌ Single point of failure
Data Protection Patterns
Pattern 1: Encryption at Rest
When to use: Sensitive data in databases, file systems, backups
Python Example:
from cryptography.fernet import Fernet
import base64
import hashlib
def encrypt_field(plaintext: str, encryption_key: str) -> str:
"""Encrypt a single field using Fernet (AES)"""
key = base64.urlsafe_b64encode(
hashlib.sha256(encryption_key.encode()).digest()
)
cipher = Fernet(key)
encrypted = cipher.encrypt(plaintext.encode())
return encrypted.decode()
def decrypt_field(ciphertext: str, encryption_key: str) -> str:
"""Decrypt a field"""
key = base64.urlsafe_b64encode(
hashlib.sha256(encryption_key.encode()).digest()
)
cipher = Fernet(key)
decrypted = cipher.decrypt(ciphertext.encode())
return decrypted.decode()
# Usage in ORM
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
email = Column(String)
ssn = Column(String) # Always encrypted
@property
def ssn_decrypted(self):
return decrypt_field(self.ssn, app.config['ENCRYPTION_KEY'])
@ssn_decrypted.setter
def ssn_decrypted(self, value):
self.ssn = encrypt_field(value, app.config['ENCRYPTION_KEY'])
# In database: ssn is stored encrypted
user = User(email='user@example.com')
user.ssn_decrypted = '123-45-6789' # Automatically encrypted on save
session.add(user)
session.commit() # Stored as encrypted ciphertext
# On retrieval: transparently decrypted
retrieved_user = session.query(User).first()
print(retrieved_user.ssn_decrypted) # '123-45-6789'
Trade-offs:
- ✅ Protects data at rest (database breaches)
- ✅ Compliance requirement (PCI-DSS, HIPAA, GDPR)
- ❌ Key management complexity
- ❌ Performance overhead (encrypt/decrypt on every access)
Input Validation Pattern
Validate All External Input
When to use: Every entry point (APIs, forms, file uploads, external systems)
Python Example:
from pydantic import BaseModel, EmailStr, Field, validator
from typing import Optional
class UserCreateRequest(BaseModel):
email: EmailStr
username: str = Field(..., min_length=3, max_length=50)
password: str = Field(..., min_length=8)
age: int = Field(..., ge=0, le=150)
@validator('username')
def username_alphanumeric(cls, v):
if not v.isalnum():
raise ValueError('must be alphanumeric')
return v
@validator('password')
def password_complexity(cls, v):
if not any(c.isupper() for c in v):
raise ValueError('must contain uppercase')
if not any(c.isdigit() for c in v):
raise ValueError('must contain number')
return v
@app.post("/api/users")
def create_user(user: UserCreateRequest):
# pydantic validates automatically
# Invalid input returns 422 error
db_user = create_in_db(user.dict())
return db_user
Go Example:
type UserCreateRequest struct {
Email string `json:"email" binding:"required,email"`
Username string `json:"username" binding:"required,min=3,max=50"`
Password string `json:"password" binding:"required,min=8"`
Age int `json:"age" binding:"required,min=0,max=150"`
}
func createUser(c *gin.Context) {
var req UserCreateRequest
// Validate input
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// Additional validation
if !isStrongPassword(req.Password) {
c.JSON(http.StatusBadRequest, gin.H{"error": "weak password"})
return
}
user := createInDB(req)
c.JSON(http.StatusCreated, user)
}
func isStrongPassword(pwd string) bool {
hasUpper := false
hasDigit := false
for _, c := range pwd {
if unicode.IsUpper(c) {
hasUpper = true
}
if unicode.IsDigit(c) {
hasDigit = true
}
}
return hasUpper && hasDigit && len(pwd) >= 8
}
Common Security Antipatterns
❌ Storing passwords in plaintext - Always hash with bcrypt/scrypt ❌ Logging sensitive data - Never log passwords, tokens, PII ❌ Hardcoding secrets - Use vault or environment variables ❌ SQL injection - Use parameterized queries, never string concatenation ❌ XSS vulnerabilities - Always encode/escape output ❌ Trusting client-side validation - Always validate server-side ❌ Weak TLS versions - Use TLS 1.2+ minimum ❌ Ignoring certificate expiration - Monitor and rotate regularly
When to Use Security Patterns
Use these patterns when:
- Building APIs with external users
- Handling sensitive data (PII, payments, health)
- Meeting compliance requirements (HIPAA, GDPR, PCI-DSS, SOC 2)
- Building multi-tenant systems
- Microservices with inter-service communication
Don’t over-engineer:
- Internal tools with limited users: simple auth is fine
- Publicly documented data: encryption not needed
- MVPs: start simple, add security as you scale
Related Commands
- See
/pb-securityfor security review checklist - See
/pb-review-microservicefor microservice security review - See
/pb-patterns-corefor OWASP patterns overview - See
/pb-loggingfor secure logging practices
Use these patterns as building blocks. Security is layered, not single-solution.