Overview

Streaming allows you to receive partial responses from the Perplexity API as they are generated, rather than waiting for the complete response. This is particularly useful for:
  • Real-time user experiences - Display responses as they’re generated
  • Long responses - Start showing content immediately for lengthy analyses
  • Interactive applications - Provide immediate feedback to users
Streaming is supported across all Perplexity models including Sonar, Sonar Pro, and reasoning models.

Quick Start

The easiest way to get started is with the Perplexity SDKs, which handle all the streaming parsing automatically. To enable streaming, set stream=True (Python) or stream: true (TypeScript) when creating completions:
from perplexity import Perplexity

# Initialize the client (uses PERPLEXITY_API_KEY environment variable)
client = Perplexity()

# Create streaming completion
stream = client.chat.completions.create(
    model="sonar",
    messages=[{"role": "user", "content": "What is the latest in AI research?"}],
    stream=True
)

# Process streaming response
for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")

Search Results and Metadata During Streaming

Search results and metadata are delivered in the final chunk(s) of a streaming response, not progressively during the stream.

How Metadata Works with Streaming

When streaming, you receive:
  1. Content chunks which arrive progressively in real-time
  2. Search results (delivered in the final chunk(s))
  3. Usage stats and other metadata

Collecting Metadata During Streaming

from perplexity import Perplexity

def stream_with_metadata():
    client = Perplexity()
    
    stream = client.chat.completions.create(
        model="sonar",
        messages=[{"role": "user", "content": "Explain quantum computing"}],
        stream=True
    )
    
    content = ""
    search_results = []
    usage_info = None
    
    for chunk in stream:
        # Process content
        if chunk.choices[0].delta.content:
            content_piece = chunk.choices[0].delta.content
            content += content_piece
            print(content_piece, end='', flush=True)
        
        # Collect metadata from final chunks
        if hasattr(chunk, 'search_results') and chunk.search_results:
            search_results = chunk.search_results
            
        if hasattr(chunk, 'usage') and chunk.usage:
            usage_info = chunk.usage
            
        # Check if streaming is complete
        if chunk.choices[0].finish_reason:
            print(f"\n\nSearch Results: {search_results}")
            print(f"Usage: {usage_info}")
    
    return content, search_results, usage_info

stream_with_metadata()

Error Handling

Proper error handling is important to ensure your application can recover from errors and provide a good user experience.
import perplexity
from perplexity import Perplexity

client = Perplexity()

try:
    stream = client.chat.completions.create(
        model="sonar-pro",
        messages=[
            {"role": "user", "content": "Explain machine learning concepts"}
        ],
        stream=True
    )
    
    content = ""
    for chunk in stream:
        if chunk.choices[0].delta.content:
            content_chunk = chunk.choices[0].delta.content
            content += content_chunk
            print(content_chunk, end="")
            
except perplexity.APIConnectionError as e:
    print(f"Network connection failed: {e}")
except perplexity.RateLimitError as e:
    print(f"Rate limit exceeded, please retry later: {e}")
except perplexity.APIStatusError as e:
    print(f"API error {e.status_code}: {e.response}")
except Exception as e:
    print(f"Unexpected error: {e}")

Proper SSE Parsing

For production use, you should properly parse Server-Sent Events (SSE) format:
# pip install sseclient-py
import sseclient
import requests
import json

def stream_with_proper_sse():
    url = "https://api.perplexity.ai/chat/completions"
    headers = {
        "Authorization": "Bearer YOUR_API_KEY",
        "Content-Type": "application/json"
    }
    payload = {
        "model": "sonar",
        "messages": [{"role": "user", "content": "Explain quantum computing"}],
        "stream": True
    }
    
    response = requests.post(url, headers=headers, json=payload, stream=True)
    client = sseclient.SSEClient(response)
    
    for event in client.events():
        if event.data == '[DONE]':
            break
        try:
            chunk_data = json.loads(event.data)
            content = chunk_data['choices'][0]['delta'].get('content', '')
            if content:
                print(content, end='')
        except json.JSONDecodeError:
            continue

stream_with_proper_sse()

Advanced Streaming Patterns

Buffered Streaming

For applications that need to process chunks in batches:
from perplexity import Perplexity
import time

def buffered_streaming(buffer_size=50, flush_interval=1.0):
    client = Perplexity()
    
    stream = client.chat.completions.create(
        model="sonar",
        messages=[{"role": "user", "content": "Write a detailed explanation of machine learning"}],
        stream=True
    )
    
    buffer = ""
    last_flush = time.time()
    
    for chunk in stream:
        if chunk.choices[0].delta.content:
            buffer += chunk.choices[0].delta.content
            
            # Flush buffer if it's full or enough time has passed
            if len(buffer) >= buffer_size or (time.time() - last_flush) >= flush_interval:
                print(buffer, end='', flush=True)
                buffer = ""
                last_flush = time.time()
    
    # Flush remaining buffer
    if buffer:
        print(buffer, end='', flush=True)

buffered_streaming()

Stream Processing with Callbacks

For applications that need to process chunks with custom logic:
from perplexity import Perplexity
from typing import Callable, Optional

def stream_with_callbacks(
    query: str,
    on_content: Optional[Callable[[str], None]] = None,
    on_search_results: Optional[Callable[[list], None]] = None,
    on_complete: Optional[Callable[[str, dict], None]] = None
):
    client = Perplexity()
    
    stream = client.chat.completions.create(
        model="sonar",
        messages=[{"role": "user", "content": query}],
        stream=True
    )
    
    full_content = ""
    metadata = {}
    
    for chunk in stream:
        # Handle content chunks
        if chunk.choices[0].delta.content:
            content_piece = chunk.choices[0].delta.content
            full_content += content_piece
            if on_content:
                on_content(content_piece)
        
        # Handle search results
        if hasattr(chunk, 'search_results') and chunk.search_results:
            metadata['search_results'] = chunk.search_results
            if on_search_results:
                on_search_results(chunk.search_results)
        
        # Handle other metadata
        if hasattr(chunk, 'usage') and chunk.usage:
            metadata['usage'] = chunk.usage
        
        # Handle completion
        if chunk.choices[0].finish_reason:
            if on_complete:
                on_complete(full_content, metadata)
    
    return full_content, metadata

# Usage example
def print_content(content: str):
    print(content, end='', flush=True)

def handle_search_results(results: list):
    print(f"\n[Found {len(results)} sources]", end='')

def handle_completion(content: str, metadata: dict):
    print(f"\n\nCompleted. Total length: {len(content)} characters")
    if 'usage' in metadata:
        print(f"Token usage: {metadata['usage']}")

stream_with_callbacks(
    "Explain the latest developments in renewable energy",
    on_content=print_content,
    on_search_results=handle_search_results,
    on_complete=handle_completion
)

Best Practices

1

Handle network interruptions

Implement reconnection logic for robust streaming applications.
import time
import random
from perplexity import Perplexity
import perplexity

def robust_streaming(query: str, max_retries: int = 3):
    client = Perplexity()
    
    for attempt in range(max_retries):
        try:
            stream = client.chat.completions.create(
                model="sonar",
                messages=[{"role": "user", "content": query}],
                stream=True
            )
            
            for chunk in stream:
                if chunk.choices[0].delta.content:
                    print(chunk.choices[0].delta.content, end='', flush=True)
            
            return  # Success, exit retry loop
            
        except (perplexity.APIConnectionError, perplexity.APITimeoutError) as e:
            if attempt < max_retries - 1:
                delay = (2 ** attempt) + random.uniform(0, 1)
                print(f"\nConnection error, retrying in {delay:.1f}s...")
                time.sleep(delay)
            else:
                print(f"\nFailed after {max_retries} attempts: {e}")
                raise

robust_streaming("Explain quantum computing")
2

Implement proper buffering

Use appropriate buffering strategies for your application’s needs.
# For real-time chat applications
buffer_size = 1  # Character-by-character for immediate display

# For document processing
buffer_size = 100  # Larger chunks for efficiency

# For API responses
buffer_size = 500  # Balance between responsiveness and efficiency
3

Handle metadata appropriately

Remember that search results and metadata arrive at the end of the stream.
# Don't expect search results until the stream is complete
if chunk.choices[0].finish_reason == "stop":
    # Now search results and usage info are available
    process_search_results(chunk.search_results)
    log_usage_stats(chunk.usage)
4

Optimize for your use case

Choose streaming parameters based on your application requirements.
# Optimize for immediate response
stream = client.chat.completions.create(
    model="sonar",
    messages=messages,
    stream=True,
    max_tokens=1000,  # Reasonable limit
    temperature=0.7   # Balanced creativity
)
Important: If you need search results immediately for your user interface, consider using non-streaming requests for use cases where search result display is critical to the real-time user experience.

Resources