A Coding Guide to Build a Production-Ready Asynchronous Python SDK with Rate Limiting, In-Memory Caching, and Authentication


In this tutorial, we guide users through building a robust, production-ready Python SDK. It begins by showing how to install and configure essential asynchronous HTTP libraries (aiohttp, nest-asyncio). It then walks through the implementation of core components, including structured response objects, token-bucket rate limiting, in-memory caching with TTL, and a clean, dataclass-driven design. We’ll see how to wrap these pieces up in an AdvancedSDK class that supports async context management, automatic retry/wait-on-rate-limit behavior, JSON/auth headers injection, and convenient HTTP-verb methods. Along the way, a demo harness against JSONPlaceholder illustrates caching efficiency, batch fetching with rate limits, error handling, and even shows how to extend the SDK via a fluent “builder” pattern for custom configuration.

import asyncio
import aiohttp
import time
import json
from typing import Dict, List, Optional, Any, Union
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import hashlib
import logging


!pip install aiohttp nest-asyncio

We install and configure the asynchronous runtime by importing asyncio and aiohttp, alongside utilities for timing, JSON handling, dataclass modeling, caching (via hashlib and datetime), and structured logging. The !pip install aiohttp nest-asyncio line ensures that the notebook can run an event loop seamlessly within Colab, enabling robust async HTTP requests and rate-limited workflows.

@dataclass
class APIResponse:
    """Structured response object"""
    data: Any
    status_code: int
    headers: Dict[str, str]
    timestamp: datetime
   
    def to_dict(self) -> Dict:
        return asdict(self)

The APIResponse dataclass encapsulates HTTP response details, payload (data), status code, headers, and the timestamp of retrieval into a single, typed object. The to_dict() helper converts the instance into a plain dictionary for easy logging, serialization, or downstream processing.

class RateLimiter:
    """Token bucket rate limiter"""
    def __init__(self, max_calls: int = 100, time_window: int = 60):
        self.max_calls = max_calls
        self.time_window = time_window
        self.calls = []
   
    def can_proceed(self) -> bool:
        now = time.time()
        self.calls = [call_time for call_time in self.calls if now - call_time < self.time_window]
       
        if len(self.calls) < self.max_calls:
            self.calls.append(now)
            return True
        return False
   
    def wait_time(self) -> float:
        if not self.calls:
            return 0
        return max(0, self.time_window - (time.time() - self.calls[0]))

The RateLimiter class enforces a simple token-bucket policy by tracking the timestamps of recent calls and allowing up to max_calls within a rolling time_window. When the limit is reached, can_proceed() returns False, and wait_time() calculates how long to pause before making the next request.

class Cache:
    """Simple in-memory cache with TTL"""
    def __init__(self, default_ttl: int = 300):
        self.cache = {}
        self.default_ttl = default_ttl
   
    def _generate_key(self, method: str, url: str, params: Dict = None) -> str:
        key_data = f"{method}:{url}:{json.dumps(params or {}, sort_keys=True)}"
        return hashlib.md5(key_data.encode()).hexdigest()
   
    def get(self, method: str, url: str, params: Dict = None) -> Optional[APIResponse]:
        key = self._generate_key(method, url, params)
        if key in self.cache:
            response, expiry = self.cache[key]
            if datetime.now() < expiry:
                return response
            del self.cache[key]
        return None
   
    def set(self, method: str, url: str, response: APIResponse, params: Dict = None, ttl: int = None):
        key = self._generate_key(method, url, params)
        expiry = datetime.now() + timedelta(seconds=ttl or self.default_ttl)
        self.cache[key] = (response, expiry)

The Cache class provides a lightweight in-memory TTL cache for API responses by hashing the request signature (method, URL, params) into a unique key. It returns valid cached APIResponse objects before expiry and automatically evicts stale entries after their time-to-live has elapsed.

class AdvancedSDK:
    """Advanced SDK with modern Python patterns"""
   
    def __init__(self, base_url: str, api_key: str = None, rate_limit: int = 100):
        self.base_url = base_url.rstrip('/')
        self.api_key = api_key
        self.session = None
        self.rate_limiter = RateLimiter(max_calls=rate_limit)
        self.cache = Cache()
        self.logger = self._setup_logger()
       
    def _setup_logger(self) -> logging.Logger:
        logger = logging.getLogger(f"SDK-{id(self)}")
        if not logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            handler.setFormatter(formatter)
            logger.addHandler(handler)
            logger.setLevel(logging.INFO)
        return logger
   
    async def __aenter__(self):
        """Async context manager entry"""
        self.session = aiohttp.ClientSession()
        return self
   
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit"""
        if self.session:
            await self.session.close()
   
    def _get_headers(self) -> Dict[str, str]:
        headers = {'Content-Type': 'application/json'}
        if self.api_key:
            headers['Authorization'] = f'Bearer {self.api_key}'
        return headers
   
    async def _make_request(self, method: str, endpoint: str, params: Dict = None,
                          data: Dict = None, use_cache: bool = True) -> APIResponse:
        """Core request method with rate limiting and caching"""
       
        if use_cache and method.upper() == 'GET':
            cached = self.cache.get(method, endpoint, params)
            if cached:
                self.logger.info(f"Cache hit for {method} {endpoint}")
                return cached
       
        if not self.rate_limiter.can_proceed():
            wait_time = self.rate_limiter.wait_time()
            self.logger.warning(f"Rate limit hit, waiting {wait_time:.2f}s")
            await asyncio.sleep(wait_time)
       
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
       
        try:
            async with self.session.request(
                method=method.upper(),
                url=url,
                params=params,
                json=data,
                headers=self._get_headers()
            ) as resp:
                response_data = await resp.json() if resp.content_type == 'application/json' else await resp.text()
               
                api_response = APIResponse(
                    data=response_data,
                    status_code=resp.status,
                    headers=dict(resp.headers),
                    timestamp=datetime.now()
                )
               
                if use_cache and method.upper() == 'GET' and 200 <= resp.status < 300:
                    self.cache.set(method, endpoint, api_response, params)
               
                self.logger.info(f"{method.upper()} {endpoint} - Status: {resp.status}")
                return api_response
               
        except Exception as e:
            self.logger.error(f"Request failed: {str(e)}")
            raise
   
    async def get(self, endpoint: str, params: Dict = None, use_cache: bool = True) -> APIResponse:
        return await self._make_request('GET', endpoint, params=params, use_cache=use_cache)
   
    async def post(self, endpoint: str, data: Dict = None) -> APIResponse:
        return await self._make_request('POST', endpoint, data=data, use_cache=False)
   
    async def put(self, endpoint: str, data: Dict = None) -> APIResponse:
        return await self._make_request('PUT', endpoint, data=data, use_cache=False)
   
    async def delete(self, endpoint: str) -> APIResponse:
        return await self._make_request('DELETE', endpoint, use_cache=False)

The AdvancedSDK class wraps everything together into a clean, async-first client: it manages an aiohttp session via async context managers, injects JSON and auth headers, and coordinates our RateLimiter and Cache under the hood. Its _make_request method centralizes GET/POST/PUT/DELETE logic, handling cache lookups, rate-limit waits, error logging, and response packing into APIResponse objects, while the get/post/put/delete helpers give us ergonomic, high-level calls.

async def demo_sdk():
    """Demonstrate SDK capabilities"""
    print("🚀 Advanced SDK Demo")
    print("=" * 50)
   
    async with AdvancedSDK("https://jsonplaceholder.typicode.com") as sdk:
       
        print("\n📥 Testing GET request with caching...")
        response1 = await sdk.get("/posts/1")
        print(f"First request - Status: {response1.status_code}")
        print(f"Title: {response1.data.get('title', 'N/A')}")
       
        response2 = await sdk.get("/posts/1")
        print(f"Second request (cached) - Status: {response2.status_code}")
       
        print("\n📤 Testing POST request...")
        new_post = {
            "title": "Advanced SDK Tutorial",
            "body": "This SDK demonstrates modern Python patterns",
            "userId": 1
        }
        post_response = await sdk.post("/posts", data=new_post)
        print(f"POST Status: {post_response.status_code}")
        print(f"Created post ID: {post_response.data.get('id', 'N/A')}")
       
        print("\n⚡ Testing batch requests with rate limiting...")
        tasks = []
        for i in range(1, 6):
            tasks.append(sdk.get(f"/posts/{i}"))
       
        results = await asyncio.gather(*tasks)
        print(f"Batch completed: {len(results)} requests")
        for i, result in enumerate(results, 1):
            print(f"  Post {i}: {result.data.get('title', 'N/A')[:30]}...")
       
        print("\n❌ Testing error handling...")
        try:
            error_response = await sdk.get("/posts/999999")
            print(f"Error response status: {error_response.status_code}")
        except Exception as e:
            print(f"Handled error: {type(e).__name__}")
   
    print("\n✅ Demo completed successfully!")


async def run_demo():
  """Colab-friendly demo runner"""
  await demo_sdk()

The demo_sdk coroutine walks through the SDK’s core features, issuing a cached GET request, performing a POST, executing a batch of GETs under rate limiting, and handling errors, against the JSONPlaceholder API, printing status codes and sample data to illustrate each capability. The run_demo helper ensures this demo runs smoothly inside a Colab notebook’s existing event loop.

import nest_asyncio
nest_asyncio.apply()


if __name__ == "__main__":
    try:
        asyncio.run(demo_sdk())
    except RuntimeError:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(demo_sdk())


class SDKBuilder:
    """Builder pattern for SDK configuration"""
    def __init__(self, base_url: str):
        self.base_url = base_url
        self.config = {}
   
    def with_auth(self, api_key: str):
        self.config['api_key'] = api_key
        return self
   
    def with_rate_limit(self, calls_per_minute: int):
        self.config['rate_limit'] = calls_per_minute
        return self
   
    def build(self) -> AdvancedSDK:
        return AdvancedSDK(self.base_url, **self.config)

Finally, we apply nest_asyncio to enable nested event loops in Colab, then run the demo via asyncio.run (with a fallback to manual loop execution if needed). It also introduces an SDKBuilder class that implements a fluent builder pattern for easily configuring and instantiating the AdvancedSDK with custom authentication and rate-limit settings.

In conclusion, this SDK tutorial provides a scalable foundation for any RESTful integration, combining modern Python idioms (dataclasses, async/await, context managers) with practical tooling (rate limiter, cache, structured logging). By adapting the patterns shown here, particularly the separation of concerns between request orchestration, caching, and response modeling, teams can accelerate the development of new API clients while ensuring predictability, observability, and resilience.


Check out the Codes. All credit for this research goes to the researchers of this project. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter.


Sana Hassan, a consulting intern at Marktechpost and dual-degree student at IIT Madras, is passionate about applying technology and AI to address real-world challenges. With a keen interest in solving practical problems, he brings a fresh perspective to the intersection of AI and real-life solutions.



Source link

Leave a Reply

Your email address will not be published. Required fields are marked *