Master production NLP systems including text classification, named entity recognition, sentiment analysis, and document processing pipelines at scale.
Natural Language Processing has evolved from academic experiments to production systems processing billions of text documents daily. After building NLP pipelines for customer service, content analysis, and document processing, I've learned that production NLP requires robust preprocessing, efficient model serving, and comprehensive text quality monitoring. Here's your complete guide to production-ready text processing systems.
NLP Pipeline Architecture
Production Text Processing Framework
# nlp_pipeline.py
import spacy
import torch
from transformers import AutoTokenizer, AutoModel, AutoModelForSequenceClassification
from typing import Dict, List, Tuple, Any, Optional, Union
from dataclasses import dataclass
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
import re
from abc import ABC, abstractmethod
@dataclass
class TextDocument:
id: str
text: str
metadata: Dict[str, Any]
language: Optional[str] = None
encoding: str = 'utf-8'
@dataclass
class NLPResult:
document_id: str
entities: List[Dict]
sentiment: Dict[str, float]
categories: List[Dict]
topics: List[Dict]
embeddings: Optional[torch.Tensor] = None
processing_time_ms: float = 0.0
quality_score: float = 1.0
class NLPProcessor(ABC):
@abstractmethod
async def process(self, document: TextDocument) -> Dict[str, Any]:
pass
class ProductionNLPPipeline:
def __init__(self,
preprocessor=None,
processors: List[NLPProcessor] = None,
postprocessor=None,
batch_size: int = 32,
max_workers: int = 4):
self.preprocessor = preprocessor or DefaultTextPreprocessor()
self.processors = processors or []
self.postprocessor = postprocessor or DefaultPostprocessor()
self.batch_size = batch_size
self.executor = ThreadPoolExecutor(max_workers=max_workers)
# Performance monitoring
self.performance_metrics = {
'documents_processed': 0,
'average_latency_ms': 0,
'throughput_docs_per_sec': 0,
'error_count': 0
}
async def process_document(self, document: TextDocument) -> NLPResult:
"""Process single document through complete NLP pipeline"""
start_time = time.time()
try:
# Preprocessing
preprocessed_doc = await self.preprocessor.process(document)
# Run all NLP processors
processing_results = {}
for processor in self.processors:
processor_name = processor.__class__.__name__
result = await processor.process(preprocessed_doc)
processing_results[processor_name] = result
# Postprocessing
final_result = await self.postprocessor.combine_results(
document, processing_results
)
# Calculate processing time
processing_time = (time.time() - start_time) * 1000
final_result.processing_time_ms = processing_time
# Update metrics
self._update_metrics(processing_time, True)
return final_result
except Exception as e:
self._update_metrics(0, False)
raise NLPProcessingError(f"Pipeline processing failed: {str(e)}")
async def process_batch(self, documents: List[TextDocument]) -> List[NLPResult]:
"""Process batch of documents efficiently"""
results = []
# Process in batches
for i in range(0, len(documents), self.batch_size):
batch = documents[i:i + self.batch_size]
# Process batch in parallel
tasks = [
self.process_document(doc)
for doc in batch
]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle exceptions
for result in batch_results:
if isinstance(result, Exception):
print(f"Batch processing error: {result}")
# Create error result
error_result = NLPResult(
document_id="error",
entities=[],
sentiment={},
categories=[],
topics=[],
quality_score=0.0
)
results.append(error_result)
else:
results.append(result)
return results
class DefaultTextPreprocessor:
def __init__(self):
self.nlp = spacy.load("en_core_web_sm")
async def process(self, document: TextDocument) -> TextDocument:
"""Preprocess text document"""
text = document.text
# Basic cleaning
text = self._clean_text(text)
# Language detection
language = self._detect_language(text)
# Text normalization
text = self._normalize_text(text)
# Create processed document
processed_doc = TextDocument(
id=document.id,
text=text,
metadata=document.metadata.copy(),
language=language
)
return processed_doc
def _clean_text(self, text: str) -> str:
"""Clean and normalize text"""
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text)
# Remove control characters
text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]', '', text)
# Normalize quotes
text = re.sub(r'[''""‚„]', '"', text)
text = re.sub(r'[''‛‚]', "'", text)
# Strip leading/trailing whitespace
text = text.strip()
return text
def _detect_language(self, text: str) -> str:
"""Detect text language"""
try:
from langdetect import detect
return detect(text)
except:
# Fallback to English
return 'en'
def _normalize_text(self, text: str) -> str:
"""Normalize text for processing"""
# Convert to lowercase for processing
# (Keep original case in metadata if needed)
return text
class NamedEntityProcessor(NLPProcessor):
def __init__(self, model_name: str = "en_core_web_sm"):
self.nlp = spacy.load(model_name)
self.entity_types = {
'PERSON': 'person',
'ORG': 'organization',
'GPE': 'location',
'MONEY': 'money',
'DATE': 'date',
'TIME': 'time'
}
async def process(self, document: TextDocument) -> Dict[str, Any]:
"""Extract named entities from text"""
# Process with spaCy
doc = self.nlp(document.text)
entities = []
for ent in doc.ents:
entity = {
'text': ent.text,
'label': self.entity_types.get(ent.label_, ent.label_),
'start_char': ent.start_char,
'end_char': ent.end_char,
'confidence': float(ent._.confidence) if hasattr(ent._, 'confidence') else 1.0
}
entities.append(entity)
# Deduplicate entities
entities = self._deduplicate_entities(entities)
return {
'entities': entities,
'entity_count': len(entities),
'unique_labels': list(set(ent['label'] for ent in entities))
}
def _deduplicate_entities(self, entities: List[Dict]) -> List[Dict]:
"""Remove duplicate entities"""
seen = set()
unique_entities = []
for entity in entities:
key = (entity['text'].lower(), entity['label'])
if key not in seen:
seen.add(key)
unique_entities.append(entity)
return unique_entities
class SentimentAnalysisProcessor(NLPProcessor):
def __init__(self, model_name: str = "cardiffnlp/twitter-roberta-base-sentiment-latest"):
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForSequenceClassification.from_pretrained(model_name)
self.model.eval()
# Label mapping
self.label_mapping = {0: 'negative', 1: 'neutral', 2: 'positive'}
async def process(self, document: TextDocument) -> Dict[str, Any]:
"""Analyze sentiment of text"""
text = document.text
# Handle long texts by chunking
if len(text) > 512:
sentiments = await self._analyze_chunks(text)
overall_sentiment = self._aggregate_chunk_sentiments(sentiments)
else:
overall_sentiment = await self._analyze_single_text(text)
return {
'sentiment': overall_sentiment,
'confidence': overall_sentiment.get('confidence', 0.0)
}
async def _analyze_single_text(self, text: str) -> Dict[str, Any]:
"""Analyze sentiment of single text"""
# Tokenize
inputs = self.tokenizer(
text,
truncation=True,
padding=True,
max_length=512,
return_tensors='pt'
)
# Get predictions
with torch.no_grad():
outputs = self.model(**inputs)
predictions = torch.nn.functional.softmax(outputs.logits, dim=-1)
# Extract results
scores = predictions[0].numpy()
predicted_label_id = scores.argmax()
predicted_label = self.label_mapping[predicted_label_id]
confidence = float(scores[predicted_label_id])
return {
'label': predicted_label,
'confidence': confidence,
'scores': {
self.label_mapping[i]: float(score)
for i, score in enumerate(scores)
}
}
async def _analyze_chunks(self, text: str) -> List[Dict]:
"""Analyze sentiment of text chunks"""
# Split text into overlapping chunks
chunk_size = 400
overlap = 50
chunks = []
for i in range(0, len(text), chunk_size - overlap):
chunk = text[i:i + chunk_size]
if chunk.strip():
chunks.append(chunk)
# Analyze each chunk
chunk_sentiments = []
for chunk in chunks:
sentiment = await self._analyze_single_text(chunk)
chunk_sentiments.append(sentiment)
return chunk_sentiments
def _aggregate_chunk_sentiments(self, chunk_sentiments: List[Dict]) -> Dict[str, Any]:
"""Aggregate sentiment scores across chunks"""
if not chunk_sentiments:
return {'label': 'neutral', 'confidence': 0.0, 'scores': {}}
# Weighted average based on confidence
total_weight = sum(cs['confidence'] for cs in chunk_sentiments)
if total_weight == 0:
return {'label': 'neutral', 'confidence': 0.0, 'scores': {}}
# Aggregate scores
aggregated_scores = {}
for label in ['negative', 'neutral', 'positive']:
weighted_sum = sum(
cs['scores'][label] * cs['confidence']
for cs in chunk_sentiments
if label in cs['scores']
)
aggregated_scores[label] = weighted_sum / total_weight
# Determine overall label
overall_label = max(aggregated_scores, key=aggregated_scores.get)
overall_confidence = aggregated_scores[overall_label]
return {
'label': overall_label,
'confidence': overall_confidence,
'scores': aggregated_scores
}
class TextClassificationProcessor(NLPProcessor):
def __init__(self,
model_name: str = "facebook/bart-large-mnli",
categories: List[str] = None):
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForSequenceClassification.from_pretrained(model_name)
self.model.eval()
self.categories = categories or [
"business", "technology", "sports", "entertainment",
"politics", "health", "science"
]
async def process(self, document: TextDocument) -> Dict[str, Any]:
"""Classify text into categories"""
text = document.text
# Zero-shot classification for each category
category_scores = {}
for category in self.categories:
score = await self._classify_for_category(text, category)
category_scores[category] = score
# Sort categories by score
sorted_categories = sorted(
category_scores.items(),
key=lambda x: x[1],
reverse=True
)
return {
'categories': [
{'label': cat, 'score': score}
for cat, score in sorted_categories
],
'top_category': sorted_categories[0][0] if sorted_categories else None,
'confidence': sorted_categories[0][1] if sorted_categories else 0.0
}
async def _classify_for_category(self, text: str, category: str) -> float:
"""Classify text for specific category using zero-shot"""
# Create hypothesis for zero-shot classification
hypothesis = f"This text is about {category}."
# Tokenize premise and hypothesis
inputs = self.tokenizer(
text,
hypothesis,
truncation=True,
padding=True,
max_length=512,
return_tensors='pt'
)
# Get prediction
with torch.no_grad():
outputs = self.model(**inputs)
predictions = torch.nn.functional.softmax(outputs.logits, dim=-1)
# Return entailment probability (indicates classification confidence)
entailment_score = float(predictions[0][2]) # Entailment class
return entailment_score
class TextEmbeddingProcessor(NLPProcessor):
def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
from sentence_transformers import SentenceTransformer
self.model = SentenceTransformer(model_name)
async def process(self, document: TextDocument) -> Dict[str, Any]:
"""Generate text embeddings"""
text = document.text
# Generate embedding
embedding = self.model.encode(text, convert_to_tensor=True)
return {
'embedding': embedding,
'embedding_dim': embedding.shape[0],
'embedding_norm': float(torch.norm(embedding))
}
Document Processing and Information Extraction
Advanced Document Processor
# document_processor.py
class DocumentProcessor:
def __init__(self):
self.parsers = {
'pdf': PDFParser(),
'docx': DocxParser(),
'html': HTMLParser(),
'txt': TxtParser()
}
self.extractors = {
'tables': TableExtractor(),
'images': ImageExtractor(),
'forms': FormExtractor(),
'structure': StructureExtractor()
}
async def process_document(self,
file_path: str,
extract_options: Dict[str, bool] = None) -> Dict:
"""Process document and extract information"""
extract_options = extract_options or {
'text': True,
'tables': True,
'images': False,
'forms': True,
'structure': True
}
# Determine file type
file_type = self._get_file_type(file_path)
if file_type not in self.parsers:
raise ValueError(f"Unsupported file type: {file_type}")
# Parse document
parser = self.parsers[file_type]
parsed_doc = await parser.parse(file_path)
# Extract information
extraction_results = {}
if extract_options.get('text', True):
extraction_results['text'] = parsed_doc['text']
if extract_options.get('tables', False):
extraction_results['tables'] = await self.extractors['tables'].extract(parsed_doc)
if extract_options.get('images', False):
extraction_results['images'] = await self.extractors['images'].extract(parsed_doc)
if extract_options.get('forms', False):
extraction_results['forms'] = await self.extractors['forms'].extract(parsed_doc)
if extract_options.get('structure', False):
extraction_results['structure'] = await self.extractors['structure'].extract(parsed_doc)
return {
'file_path': file_path,
'file_type': file_type,
'metadata': parsed_doc.get('metadata', {}),
'extracted_content': extraction_results
}
class TableExtractor:
def __init__(self):
# Initialize table detection models
pass
async def extract(self, parsed_doc: Dict) -> List[Dict]:
"""Extract tables from document"""
tables = []
# Use multiple approaches for table detection
if 'html' in parsed_doc:
html_tables = self._extract_html_tables(parsed_doc['html'])
tables.extend(html_tables)
if 'text' in parsed_doc:
text_tables = self._extract_text_tables(parsed_doc['text'])
tables.extend(text_tables)
# Clean and validate tables
cleaned_tables = []
for table in tables:
if self._validate_table(table):
cleaned_table = self._clean_table(table)
cleaned_tables.append(cleaned_table)
return cleaned_tables
def _extract_html_tables(self, html_content: str) -> List[Dict]:
"""Extract tables from HTML content"""
import pandas as pd
from bs4 import BeautifulSoup
tables = []
try:
# Use pandas for HTML table parsing
dfs = pd.read_html(html_content)
for i, df in enumerate(dfs):
table = {
'id': f'html_table_{i}',
'type': 'html',
'headers': df.columns.tolist(),
'rows': df.values.tolist(),
'shape': df.shape
}
tables.append(table)
except Exception as e:
print(f"HTML table extraction error: {e}")
return tables
def _extract_text_tables(self, text: str) -> List[Dict]:
"""Extract tables from plain text using patterns"""
tables = []
# Look for table-like patterns
lines = text.split('\n')
potential_tables = []
current_table = []
for line in lines:
# Check if line looks like a table row
if self._is_table_row(line):
current_table.append(line)
else:
if len(current_table) >= 3: # Minimum table size
potential_tables.append(current_table)
current_table = []
# Add last table if exists
if len(current_table) >= 3:
potential_tables.append(current_table)
# Parse potential tables
for i, table_lines in enumerate(potential_tables):
parsed_table = self._parse_text_table(table_lines)
if parsed_table:
parsed_table['id'] = f'text_table_{i}'
parsed_table['type'] = 'text'
tables.append(parsed_table)
return tables
def _is_table_row(self, line: str) -> bool:
"""Check if line looks like a table row"""
# Look for common table separators
separators = ['|', '\t', ' ', ',']
for sep in separators:
if line.count(sep) >= 2: # At least 2 separators for 3 columns
return True
return False
def _parse_text_table(self, lines: List[str]) -> Optional[Dict]:
"""Parse text lines into table structure"""
if not lines:
return None
# Determine separator
separator = self._detect_separator(lines)
if not separator:
return None
# Parse rows
rows = []
for line in lines:
cells = [cell.strip() for cell in line.split(separator)]
if cells and any(cell for cell in cells): # Non-empty row
rows.append(cells)
if len(rows) < 2:
return None
# First row as headers
headers = rows[0]
data_rows = rows[1:]
return {
'headers': headers,
'rows': data_rows,
'shape': (len(data_rows), len(headers)),
'separator': separator
}
class FormExtractor:
def __init__(self):
# Initialize form detection models
pass
async def extract(self, parsed_doc: Dict) -> List[Dict]:
"""Extract form fields and values from document"""
forms = []
text = parsed_doc.get('text', '')
# Pattern-based form field detection
form_patterns = [
r'(\w+(?:\s+\w+)*)\s*[:]\s*([^\n]+)', # Field: Value
r'(\w+(?:\s+\w+)*)\s*[=]\s*([^\n]+)', # Field = Value
r'(\w+(?:\s+\w+)*)\s*[-]\s*([^\n]+)', # Field - Value
]
extracted_fields = []
for pattern in form_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
for field_name, field_value in matches:
field = {
'field_name': field_name.strip(),
'field_value': field_value.strip(),
'confidence': self._calculate_field_confidence(field_name, field_value)
}
if field['confidence'] > 0.5: # Only include confident extractions
extracted_fields.append(field)
# Group fields into logical forms
if extracted_fields:
forms.append({
'id': 'extracted_form_0',
'type': 'pattern_based',
'fields': extracted_fields,
'field_count': len(extracted_fields)
})
return forms
def _calculate_field_confidence(self, field_name: str, field_value: str) -> float:
"""Calculate confidence score for extracted field"""
confidence = 0.5 # Base confidence
# Boost confidence for common field types
common_fields = ['name', 'email', 'phone', 'address', 'date', 'amount']
if any(common_field in field_name.lower() for common_field in common_fields):
confidence += 0.2
# Boost confidence for structured values
if re.match(r'^\d{4}-\d{2}-\d{2}$', field_value): # Date
confidence += 0.2
elif re.match(r'^[\w\.-]+@[\w\.-]+\.\w+$', field_value): # Email
confidence += 0.3
elif re.match(r'^\+?\d[\d\s\-\(\)]+$', field_value): # Phone
confidence += 0.2
# Penalize very short or very long values
if len(field_value) < 2:
confidence -= 0.3
elif len(field_value) > 100:
confidence -= 0.2
return min(1.0, max(0.0, confidence))
Text Quality Assessment and Monitoring
Text Quality Analyzer
# text_quality.py
class TextQualityAnalyzer:
def __init__(self):
self.quality_metrics = [
'readability',
'coherence',
'completeness',
'language_detection_confidence',
'encoding_issues',
'noise_level'
]
def analyze_quality(self, text: str, metadata: Dict = None) -> Dict[str, float]:
"""Comprehensive text quality analysis"""
metrics = {}
# Basic text statistics
stats = self._calculate_text_stats(text)
# Readability metrics
metrics['readability'] = self._calculate_readability(text, stats)
# Coherence analysis
metrics['coherence'] = self._analyze_coherence(text)
# Completeness check
metrics['completeness'] = self._check_completeness(text, stats)
# Language detection confidence
metrics['language_confidence'] = self._language_detection_confidence(text)
# Encoding issues
metrics['encoding_quality'] = self._check_encoding_quality(text)
# Noise level
metrics['noise_level'] = 1.0 - self._calculate_noise_level(text)
# Overall quality score (weighted average)
weights = {
'readability': 0.2,
'coherence': 0.3,
'completeness': 0.2,
'language_confidence': 0.1,
'encoding_quality': 0.1,
'noise_level': 0.1
}
overall_score = sum(
metrics[metric] * weights[metric]
for metric in weights
if metric in metrics
)
metrics['overall_quality'] = overall_score
return metrics
def _calculate_text_stats(self, text: str) -> Dict:
"""Calculate basic text statistics"""
words = text.split()
sentences = re.split(r'[.!?]+', text)
sentences = [s.strip() for s in sentences if s.strip()]
return {
'char_count': len(text),
'word_count': len(words),
'sentence_count': len(sentences),
'avg_word_length': sum(len(word) for word in words) / len(words) if words else 0,
'avg_sentence_length': len(words) / len(sentences) if sentences else 0
}
def _calculate_readability(self, text: str, stats: Dict) -> float:
"""Calculate readability score (simplified Flesch-Kincaid)"""
if stats['sentence_count'] == 0 or stats['word_count'] == 0:
return 0.0
# Count syllables (approximation)
syllable_count = self._estimate_syllables(text)
# Flesch-Kincaid Grade Level formula
fk_score = (0.39 * (stats['word_count'] / stats['sentence_count']) +
11.8 * (syllable_count / stats['word_count']) - 15.59)
# Convert to 0-1 scale (higher is better, target grade level 8-10)
target_grade = 9
if fk_score <= target_grade:
return 1.0 - abs(fk_score - target_grade) / target_grade
else:
return max(0.0, 1.0 - (fk_score - target_grade) / 10)
def _estimate_syllables(self, text: str) -> int:
"""Estimate syllable count"""
# Simple syllable counting approximation
words = re.findall(r'\b\w+\b', text.lower())
total_syllables = 0
for word in words:
syllables = len(re.findall(r'[aeiou]+', word))
if word.endswith('e'):
syllables -= 1
if syllables == 0:
syllables = 1
total_syllables += syllables
return total_syllables
def _analyze_coherence(self, text: str) -> float:
"""Analyze text coherence using sentence similarity"""
sentences = re.split(r'[.!?]+', text)
sentences = [s.strip() for s in sentences if s.strip() and len(s) > 10]
if len(sentences) < 2:
return 1.0 # Single sentence is coherent
try:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2')
# Get sentence embeddings
embeddings = model.encode(sentences)
# Calculate pairwise similarities
similarities = []
for i in range(len(embeddings) - 1):
similarity = torch.cosine_similarity(
torch.tensor(embeddings[i]).unsqueeze(0),
torch.tensor(embeddings[i + 1]).unsqueeze(0)
)[0].item()
similarities.append(similarity)
# Average similarity as coherence measure
coherence_score = sum(similarities) / len(similarities)
# Normalize to 0-1 range
return max(0.0, min(1.0, (coherence_score + 1) / 2))
except ImportError:
# Fallback: simple keyword overlap
return self._simple_coherence_measure(sentences)
def _simple_coherence_measure(self, sentences: List[str]) -> float:
"""Simple coherence measure based on word overlap"""
if len(sentences) < 2:
return 1.0
overlaps = []
for i in range(len(sentences) - 1):
words1 = set(sentences[i].lower().split())
words2 = set(sentences[i + 1].lower().split())
if len(words1.union(words2)) > 0:
overlap = len(words1.intersection(words2)) / len(words1.union(words2))
overlaps.append(overlap)
return sum(overlaps) / len(overlaps) if overlaps else 0.0
def _check_completeness(self, text: str, stats: Dict) -> float:
"""Check text completeness"""
completeness_score = 1.0
# Check for truncation indicators
truncation_indicators = [
'...', '[truncated]', '[continued]', '(more)', 'see more'
]
text_lower = text.lower()
for indicator in truncation_indicators:
if indicator in text_lower:
completeness_score -= 0.3
break
# Check for minimum length
if stats['word_count'] < 10:
completeness_score -= 0.4
elif stats['word_count'] < 50:
completeness_score -= 0.2
# Check for proper ending
if not text.rstrip().endswith(('.', '!', '?', ':', ';')):
completeness_score -= 0.2
return max(0.0, completeness_score)
def _language_detection_confidence(self, text: str) -> float:
"""Get language detection confidence"""
try:
from langdetect import detect_langs
lang_probs = detect_langs(text)
if lang_probs:
return float(lang_probs[0].prob)
else:
return 0.5
except Exception:
return 0.5 # Default confidence
def _check_encoding_quality(self, text: str) -> float:
"""Check for encoding issues"""
quality_score = 1.0
# Check for common encoding artifacts
encoding_issues = [
'�', # Replacement character
'’', # Incorrect UTF-8 decoding
'á', 'é', 'Ã', 'ó', 'ú', # Common encoding errors
]
for issue in encoding_issues:
if issue in text:
quality_score -= 0.3
break
# Check for excessive non-printable characters
non_printable = sum(1 for c in text if ord(c) < 32 and c not in '\n\r\t')
if non_printable > len(text) * 0.01: # More than 1% non-printable
quality_score -= 0.4
return max(0.0, quality_score)
def _calculate_noise_level(self, text: str) -> float:
"""Calculate text noise level"""
noise_indicators = 0
total_indicators = 0
# Check for excessive punctuation
punctuation_ratio = sum(1 for c in text if c in '!@#$%^&*()_+{}[]|;:,.<>?') / len(text)
noise_indicators += min(1.0, punctuation_ratio * 10)
total_indicators += 1
# Check for excessive uppercase
uppercase_ratio = sum(1 for c in text if c.isupper()) / max(1, len(text))
if uppercase_ratio > 0.3: # More than 30% uppercase
noise_indicators += min(1.0, uppercase_ratio)
total_indicators += 1
# Check for repetitive patterns
words = text.split()
if len(words) > 10:
word_frequency = {}
for word in words:
word_frequency[word] = word_frequency.get(word, 0) + 1
max_frequency = max(word_frequency.values())
repetition_ratio = max_frequency / len(words)
if repetition_ratio > 0.1: # Same word appears more than 10% of the time
noise_indicators += min(1.0, repetition_ratio * 5)
total_indicators += 1
return noise_indicators / total_indicators if total_indicators > 0 else 0.0
Production Deployment and Scaling
NLP Model Serving
# nlp_serving.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
import asyncio
import uvicorn
from typing import List, Optional
import logging
class TextProcessingRequest(BaseModel):
texts: List[str]
options: Optional[Dict[str, Any]] = None
callback_url: Optional[str] = None
class ProcessingResponse(BaseModel):
request_id: str
status: str
results: Optional[List[NLPResult]] = None
error: Optional[str] = None
class NLPServingAPI:
def __init__(self, pipeline: ProductionNLPPipeline):
self.app = FastAPI(title="NLP Processing API")
self.pipeline = pipeline
self.request_queue = asyncio.Queue(maxsize=1000)
self.results_store = {}
self.setup_routes()
def setup_routes(self):
@self.app.post("/process", response_model=ProcessingResponse)
async def process_texts(request: TextProcessingRequest):
"""Process texts synchronously"""
try:
# Convert texts to documents
documents = [
TextDocument(
id=f"doc_{i}",
text=text,
metadata={'request_timestamp': time.time()}
)
for i, text in enumerate(request.texts)
]
# Process documents
results = await self.pipeline.process_batch(documents)
return ProcessingResponse(
request_id=f"sync_{int(time.time())}",
status="completed",
results=results
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@self.app.post("/process_async", response_model=Dict[str, str])
async def process_texts_async(
request: TextProcessingRequest,
background_tasks: BackgroundTasks
):
"""Process texts asynchronously"""
request_id = f"async_{int(time.time())}"
# Add to background processing
background_tasks.add_task(
self._process_async,
request_id,
request.texts,
request.options,
request.callback_url
)
return {"request_id": request_id, "status": "queued"}
@self.app.get("/status/{request_id}")
async def get_processing_status(request_id: str):
"""Get processing status"""
if request_id in self.results_store:
return self.results_store[request_id]
else:
return {"request_id": request_id, "status": "not_found"}
@self.app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"queue_size": self.request_queue.qsize(),
"performance_metrics": self.pipeline.performance_metrics
}
async def _process_async(self,
request_id: str,
texts: List[str],
options: Optional[Dict] = None,
callback_url: Optional[str] = None):
"""Process texts asynchronously"""
try:
# Store initial status
self.results_store[request_id] = {
"request_id": request_id,
"status": "processing",
"started_at": time.time()
}
# Convert texts to documents
documents = [
TextDocument(
id=f"doc_{i}",
text=text,
metadata={'request_id': request_id}
)
for i, text in enumerate(texts)
]
# Process documents
results = await self.pipeline.process_batch(documents)
# Store results
self.results_store[request_id] = {
"request_id": request_id,
"status": "completed",
"results": results,
"completed_at": time.time()
}
# Send callback if provided
if callback_url:
await self._send_callback(callback_url, self.results_store[request_id])
except Exception as e:
self.results_store[request_id] = {
"request_id": request_id,
"status": "error",
"error": str(e),
"failed_at": time.time()
}
async def _send_callback(self, callback_url: str, result: Dict):
"""Send callback notification"""
try:
import aiohttp
async with aiohttp.ClientSession() as session:
await session.post(callback_url, json=result)
except Exception as e:
logging.error(f"Callback failed: {e}")
def start_server(self, host: str = "0.0.0.0", port: int = 8000):
"""Start the API server"""
uvicorn.run(self.app, host=host, port=port)
Best Practices Checklist
Conclusion
Production NLP systems require careful orchestration of preprocessing, model serving, quality assessment, and monitoring. By implementing robust text processing pipelines, efficient model serving architectures, and comprehensive quality monitoring, you can build NLP systems that perform reliably at scale. Remember that text data is inherently noisy and varied—building systems that gracefully handle this variability while maintaining consistent performance is key to success in production environments.