Overview

This guide covers essential best practices for using the Perplexity SDKs in production environments. Following these practices will help you build robust, secure, and efficient applications.

Security Best Practices

Environment Variables

Always store API keys securely using environment variables:
1

Use environment variables

Store API keys in environment variables, never in source code.
import os
from perplexity import Perplexity

# Good: Use environment variables
client = Perplexity(
    api_key=os.environ.get("PERPLEXITY_API_KEY")
)

# Bad: Never hardcode API keys
# client = Perplexity(api_key="pplx-abc123...")  # DON'T DO THIS
Never commit API keys to version control. Use .env files locally and secure environment variable management in production.
2

Use .env files for local development

Create a .env file for local development (add it to .gitignore).
.env
PERPLEXITY_API_KEY=your_api_key_here
PERPLEXITY_MAX_RETRIES=3
PERPLEXITY_TIMEOUT=30000
from dotenv import load_dotenv
import os
from perplexity import Perplexity

# Load environment variables from .env file
load_dotenv()

client = Perplexity(
    api_key=os.getenv("PERPLEXITY_API_KEY"),
    max_retries=int(os.getenv("PERPLEXITY_MAX_RETRIES", "3"))
)
3

Validate environment variables

Check for required environment variables at startup.
import os
import sys
from perplexity import Perplexity

def create_client():
    api_key = os.getenv("PERPLEXITY_API_KEY")
    if not api_key:
        print("Error: PERPLEXITY_API_KEY environment variable is required")
        sys.exit(1)
    
    return Perplexity(api_key=api_key)

client = create_client()

API Key Rotation

Implement secure API key rotation:
import os
import logging
from perplexity import Perplexity
from typing import Optional

class SecurePerplexityClient:
    def __init__(self, primary_key: Optional[str] = None, fallback_key: Optional[str] = None):
        self.primary_key = primary_key or os.getenv("PERPLEXITY_API_KEY")
        self.fallback_key = fallback_key or os.getenv("PERPLEXITY_API_KEY_FALLBACK")
        self.current_client = Perplexity(api_key=self.primary_key)
        self.logger = logging.getLogger(__name__)
    
    def _switch_to_fallback(self):
        """Switch to fallback API key if available"""
        if self.fallback_key:
            self.logger.warning("Switching to fallback API key")
            self.current_client = Perplexity(api_key=self.fallback_key)
            return True
        return False
    
    def search(self, query: str, **kwargs):
        try:
            return self.current_client.search.create(query=query, **kwargs)
        except Exception as e:
            if "authentication" in str(e).lower() and self._switch_to_fallback():
                return self.current_client.search.create(query=query, **kwargs)
            raise e

# Usage
client = SecurePerplexityClient()

Rate Limiting and Efficiency

Intelligent Rate Limiting

Implement exponential backoff with jitter:
import time
import random
import asyncio
from typing import TypeVar, Callable, Any
import perplexity
from perplexity import Perplexity

T = TypeVar('T')

class RateLimitedClient:
    def __init__(self, client: Perplexity, max_retries: int = 5):
        self.client = client
        self.max_retries = max_retries
    
    def _calculate_delay(self, attempt: int) -> float:
        """Calculate delay with exponential backoff and jitter"""
        base_delay = 2 ** attempt
        jitter = random.uniform(0.1, 0.5)
        return min(base_delay + jitter, 60.0)  # Cap at 60 seconds
    
    def with_retry(self, func: Callable[[], T]) -> T:
        """Execute function with intelligent retry logic"""
        last_exception = None
        
        for attempt in range(self.max_retries):
            try:
                return func()
            except perplexity.RateLimitError as e:
                last_exception = e
                if attempt < self.max_retries - 1:
                    delay = self._calculate_delay(attempt)
                    print(f"Rate limited. Retrying in {delay:.2f}s (attempt {attempt + 1})")
                    time.sleep(delay)
                    continue
                raise e
            except perplexity.APIConnectionError as e:
                last_exception = e
                if attempt < self.max_retries - 1:
                    delay = min(2 ** attempt, 10.0)  # Shorter delay for connection errors
                    print(f"Connection error. Retrying in {delay:.2f}s")
                    time.sleep(delay)
                    continue
                raise e
        
        raise last_exception
    
    def search(self, query: str, **kwargs):
        return self.with_retry(
            lambda: self.client.search.create(query=query, **kwargs)
        )

# Usage
client = RateLimitedClient(Perplexity())
result = client.search("artificial intelligence")

Request Batching

Efficiently batch multiple requests:
import asyncio
from typing import List, TypeVar, Generic
from perplexity import AsyncPerplexity, DefaultAioHttpClient

T = TypeVar('T')

class BatchProcessor(Generic[T]):
    def __init__(self, batch_size: int = 5, delay_between_batches: float = 1.0):
        self.batch_size = batch_size
        self.delay_between_batches = delay_between_batches
    
    async def process_batch(
        self, 
        items: List[str], 
        process_func: Callable[[str], Awaitable[T]]
    ) -> List[T]:
        """Process items in batches with rate limiting"""
        results = []
        
        for i in range(0, len(items), self.batch_size):
            batch = items[i:i + self.batch_size]
            
            # Process batch concurrently
            tasks = [process_func(item) for item in batch]
            batch_results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # Filter out exceptions and collect results
            for result in batch_results:
                if not isinstance(result, Exception):
                    results.append(result)
            
            # Delay between batches
            if i + self.batch_size < len(items):
                await asyncio.sleep(self.delay_between_batches)
        
        return results

# Usage
async def main():
    processor = BatchProcessor(batch_size=3, delay_between_batches=0.5)
    
    async with AsyncPerplexity(
        http_client=DefaultAioHttpClient()
    ) as client:
        
        async def search_query(query: str):
            return await client.search.create(query=query)
        
        queries = ["AI", "ML", "DL", "NLP", "CV"]
        results = await processor.process_batch(queries, search_query)
        
        print(f"Processed {len(results)} successful queries")

asyncio.run(main())

Production Configuration

Configuration Management

Use environment-based configuration for different deployment stages:
import os
from dataclasses import dataclass
from typing import Optional
import httpx
from perplexity import Perplexity, DefaultHttpxClient

@dataclass
class PerplexityConfig:
    api_key: str
    max_retries: int = 3
    timeout_seconds: float = 30.0
    max_connections: int = 100
    max_keepalive: int = 20
    environment: str = "production"
    
    @classmethod
    def from_env(cls) -> "PerplexityConfig":
        """Load configuration from environment variables"""
        api_key = os.getenv("PERPLEXITY_API_KEY")
        if not api_key:
            raise ValueError("PERPLEXITY_API_KEY environment variable is required")
        
        return cls(
            api_key=api_key,
            max_retries=int(os.getenv("PERPLEXITY_MAX_RETRIES", "3")),
            timeout_seconds=float(os.getenv("PERPLEXITY_TIMEOUT", "30.0")),
            max_connections=int(os.getenv("PERPLEXITY_MAX_CONNECTIONS", "100")),
            max_keepalive=int(os.getenv("PERPLEXITY_MAX_KEEPALIVE", "20")),
            environment=os.getenv("ENVIRONMENT", "production")
        )
    
    def create_client(self) -> Perplexity:
        """Create optimized client based on configuration"""
        timeout = httpx.Timeout(
            connect=5.0,
            read=self.timeout_seconds,
            write=10.0,
            pool=10.0
        )
        
        limits = httpx.Limits(
            max_keepalive_connections=self.max_keepalive,
            max_connections=self.max_connections,
            keepalive_expiry=60.0 if self.environment == "production" else 30.0
        )
        
        return Perplexity(
            api_key=self.api_key,
            max_retries=self.max_retries,
            timeout=timeout,
            http_client=DefaultHttpxClient(limits=limits)
        )

# Usage
config = PerplexityConfig.from_env()
client = config.create_client()

Monitoring and Logging

Implement comprehensive monitoring:
import logging
import time
import functools
from typing import Any, Callable
from perplexity import Perplexity
import perplexity

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class MonitoredPerplexityClient:
    def __init__(self, client: Perplexity):
        self.client = client
        self.request_count = 0
        self.error_count = 0
        self.total_response_time = 0.0
    
    def _log_request(self, method: str, **kwargs):
        """Log request details"""
        self.request_count += 1
        logger.info(f"Making {method} request #{self.request_count}")
        logger.debug(f"Request parameters: {kwargs}")
    
    def _log_response(self, method: str, duration: float, success: bool = True):
        """Log response details"""
        self.total_response_time += duration
        avg_response_time = self.total_response_time / self.request_count
        
        if success:
            logger.info(f"{method} completed in {duration:.2f}s (avg: {avg_response_time:.2f}s)")
        else:
            self.error_count += 1
            logger.error(f"{method} failed after {duration:.2f}s (errors: {self.error_count})")
    
    def search(self, query: str, **kwargs):
        self._log_request("search", query=query, **kwargs)
        start_time = time.time()
        
        try:
            result = self.client.search.create(query=query, **kwargs)
            duration = time.time() - start_time
            self._log_response("search", duration, success=True)
            return result
        except Exception as e:
            duration = time.time() - start_time
            self._log_response("search", duration, success=False)
            logger.error(f"Search error: {type(e).__name__}: {e}")
            raise
    
    def get_stats(self):
        """Get client statistics"""
        return {
            "total_requests": self.request_count,
            "error_count": self.error_count,
            "success_rate": (self.request_count - self.error_count) / max(self.request_count, 1),
            "avg_response_time": self.total_response_time / max(self.request_count, 1)
        }

# Usage
client = MonitoredPerplexityClient(Perplexity())
result = client.search("machine learning")
print(client.get_stats())

Error Handling Best Practices

Graceful Degradation

Implement fallback strategies for different error types:
from typing import Optional, Dict, Any
import perplexity
from perplexity import Perplexity

class ResilientPerplexityClient:
    def __init__(self, client: Perplexity):
        self.client = client
        self.circuit_breaker_threshold = 5
        self.circuit_breaker_count = 0
        self.circuit_breaker_open = False
    
    def _should_circuit_break(self) -> bool:
        """Check if circuit breaker should be triggered"""
        return self.circuit_breaker_count >= self.circuit_breaker_threshold
    
    def _record_failure(self):
        """Record a failure for circuit breaker"""
        self.circuit_breaker_count += 1
        if self._should_circuit_break():
            self.circuit_breaker_open = True
            print("Circuit breaker activated - temporarily disabling API calls")
    
    def _record_success(self):
        """Record a success - reset circuit breaker"""
        self.circuit_breaker_count = 0
        self.circuit_breaker_open = False
    
    def search_with_fallback(
        self, 
        query: str, 
        fallback_response: Optional[Dict[str, Any]] = None
    ):
        """Search with graceful degradation"""
        if self.circuit_breaker_open:
            print("Circuit breaker open - returning fallback response")
            return fallback_response or {
                "query": query,
                "results": [],
                "status": "service_unavailable"
            }
        
        try:
            result = self.client.search.create(query=query)
            self._record_success()
            return result
            
        except perplexity.RateLimitError:
            print("Rate limited - implementing backoff strategy")
            # Could implement intelligent backoff here
            raise
            
        except perplexity.APIConnectionError as e:
            print(f"Connection error: {e}")
            self._record_failure()
            return fallback_response or {
                "query": query,
                "results": [],
                "status": "connection_error"
            }
            
        except Exception as e:
            print(f"Unexpected error: {e}")
            self._record_failure()
            return fallback_response or {
                "query": query,
                "results": [],
                "status": "error"
            }

# Usage
client = ResilientPerplexityClient(Perplexity())
result = client.search_with_fallback("machine learning")

Testing Best Practices

Unit Testing with Mocking

Create testable code with proper mocking:
import unittest
from unittest.mock import Mock, patch
from perplexity import Perplexity
from perplexity.types import SearchCreateResponse, SearchResult

class TestPerplexityIntegration(unittest.TestCase):
    def setUp(self):
        self.mock_client = Mock(spec=Perplexity)
        
    def test_search_success(self):
        # Mock successful response
        mock_result = SearchResult(
            title="Test Result",
            url="https://example.com",
            snippet="Test snippet"
        )
        
        mock_response = SearchCreateResponse(
            query="test query",
            results=[mock_result]
        )
        
        self.mock_client.search.create.return_value = mock_response
        
        # Test your application logic
        result = self.mock_client.search.create(query="test query")
        
        self.assertEqual(result.query, "test query")
        self.assertEqual(len(result.results), 1)
        self.assertEqual(result.results[0].title, "Test Result")
    
    @patch('perplexity.Perplexity')
    def test_rate_limit_handling(self, mock_perplexity_class):
        # Mock rate limit error
        mock_client = Mock()
        mock_perplexity_class.return_value = mock_client
        mock_client.search.create.side_effect = perplexity.RateLimitError("Rate limited")
        
        # Test your error handling logic here
        with self.assertRaises(perplexity.RateLimitError):
            mock_client.search.create(query="test")

if __name__ == '__main__':
    unittest.main()

Performance Best Practices Summary

1

Use environment variables for configuration

Never hardcode API keys or configuration values.
2

Implement intelligent rate limiting

Use exponential backoff with jitter for retry strategies.
3

Configure connection pooling

Optimize HTTP client settings for your use case.
4

Monitor and log appropriately

Track performance metrics and error rates.
5

Implement graceful degradation

Provide fallback responses when APIs are unavailable.
6

Write testable code

Use dependency injection and mocking for unit tests.