Build Scalable NLP Text Processing Systems

David Childs

Master production NLP systems including text classification, named entity recognition, sentiment analysis, and document processing pipelines at scale.

Natural Language Processing has evolved from academic experiments to production systems processing billions of text documents daily. After building NLP pipelines for customer service, content analysis, and document processing, I've learned that production NLP requires robust preprocessing, efficient model serving, and comprehensive text quality monitoring. Here's your complete guide to production-ready text processing systems.

NLP Pipeline Architecture

Production Text Processing Framework

# nlp_pipeline.py
import spacy
import torch
from transformers import AutoTokenizer, AutoModel, AutoModelForSequenceClassification
from typing import Dict, List, Tuple, Any, Optional, Union
from dataclasses import dataclass
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
import re
from abc import ABC, abstractmethod

@dataclass
class TextDocument:
    id: str
    text: str
    metadata: Dict[str, Any]
    language: Optional[str] = None
    encoding: str = 'utf-8'

@dataclass
class NLPResult:
    document_id: str
    entities: List[Dict]
    sentiment: Dict[str, float]
    categories: List[Dict]
    topics: List[Dict]
    embeddings: Optional[torch.Tensor] = None
    processing_time_ms: float = 0.0
    quality_score: float = 1.0

class NLPProcessor(ABC):
    @abstractmethod
    async def process(self, document: TextDocument) -> Dict[str, Any]:
        pass

class ProductionNLPPipeline:
    def __init__(self, 
                 preprocessor=None,
                 processors: List[NLPProcessor] = None,
                 postprocessor=None,
                 batch_size: int = 32,
                 max_workers: int = 4):
        
        self.preprocessor = preprocessor or DefaultTextPreprocessor()
        self.processors = processors or []
        self.postprocessor = postprocessor or DefaultPostprocessor()
        self.batch_size = batch_size
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        
        # Performance monitoring
        self.performance_metrics = {
            'documents_processed': 0,
            'average_latency_ms': 0,
            'throughput_docs_per_sec': 0,
            'error_count': 0
        }
    
    async def process_document(self, document: TextDocument) -> NLPResult:
        """Process single document through complete NLP pipeline"""
        
        start_time = time.time()
        
        try:
            # Preprocessing
            preprocessed_doc = await self.preprocessor.process(document)
            
            # Run all NLP processors
            processing_results = {}
            for processor in self.processors:
                processor_name = processor.__class__.__name__
                result = await processor.process(preprocessed_doc)
                processing_results[processor_name] = result
            
            # Postprocessing
            final_result = await self.postprocessor.combine_results(
                document, processing_results
            )
            
            # Calculate processing time
            processing_time = (time.time() - start_time) * 1000
            final_result.processing_time_ms = processing_time
            
            # Update metrics
            self._update_metrics(processing_time, True)
            
            return final_result
            
        except Exception as e:
            self._update_metrics(0, False)
            raise NLPProcessingError(f"Pipeline processing failed: {str(e)}")
    
    async def process_batch(self, documents: List[TextDocument]) -> List[NLPResult]:
        """Process batch of documents efficiently"""
        
        results = []
        
        # Process in batches
        for i in range(0, len(documents), self.batch_size):
            batch = documents[i:i + self.batch_size]
            
            # Process batch in parallel
            tasks = [
                self.process_document(doc)
                for doc in batch
            ]
            
            batch_results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # Handle exceptions
            for result in batch_results:
                if isinstance(result, Exception):
                    print(f"Batch processing error: {result}")
                    # Create error result
                    error_result = NLPResult(
                        document_id="error",
                        entities=[],
                        sentiment={},
                        categories=[],
                        topics=[],
                        quality_score=0.0
                    )
                    results.append(error_result)
                else:
                    results.append(result)
        
        return results

class DefaultTextPreprocessor:
    def __init__(self):
        self.nlp = spacy.load("en_core_web_sm")
        
    async def process(self, document: TextDocument) -> TextDocument:
        """Preprocess text document"""
        
        text = document.text
        
        # Basic cleaning
        text = self._clean_text(text)
        
        # Language detection
        language = self._detect_language(text)
        
        # Text normalization
        text = self._normalize_text(text)
        
        # Create processed document
        processed_doc = TextDocument(
            id=document.id,
            text=text,
            metadata=document.metadata.copy(),
            language=language
        )
        
        return processed_doc
    
    def _clean_text(self, text: str) -> str:
        """Clean and normalize text"""
        
        # Remove excessive whitespace
        text = re.sub(r'\s+', ' ', text)
        
        # Remove control characters
        text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]', '', text)
        
        # Normalize quotes
        text = re.sub(r'[''""‚„]', '"', text)
        text = re.sub(r'[''‛‚]', "'", text)
        
        # Strip leading/trailing whitespace
        text = text.strip()
        
        return text
    
    def _detect_language(self, text: str) -> str:
        """Detect text language"""
        
        try:
            from langdetect import detect
            return detect(text)
        except:
            # Fallback to English
            return 'en'
    
    def _normalize_text(self, text: str) -> str:
        """Normalize text for processing"""
        
        # Convert to lowercase for processing
        # (Keep original case in metadata if needed)
        return text

class NamedEntityProcessor(NLPProcessor):
    def __init__(self, model_name: str = "en_core_web_sm"):
        self.nlp = spacy.load(model_name)
        self.entity_types = {
            'PERSON': 'person',
            'ORG': 'organization', 
            'GPE': 'location',
            'MONEY': 'money',
            'DATE': 'date',
            'TIME': 'time'
        }
    
    async def process(self, document: TextDocument) -> Dict[str, Any]:
        """Extract named entities from text"""
        
        # Process with spaCy
        doc = self.nlp(document.text)
        
        entities = []
        for ent in doc.ents:
            entity = {
                'text': ent.text,
                'label': self.entity_types.get(ent.label_, ent.label_),
                'start_char': ent.start_char,
                'end_char': ent.end_char,
                'confidence': float(ent._.confidence) if hasattr(ent._, 'confidence') else 1.0
            }
            entities.append(entity)
        
        # Deduplicate entities
        entities = self._deduplicate_entities(entities)
        
        return {
            'entities': entities,
            'entity_count': len(entities),
            'unique_labels': list(set(ent['label'] for ent in entities))
        }
    
    def _deduplicate_entities(self, entities: List[Dict]) -> List[Dict]:
        """Remove duplicate entities"""
        
        seen = set()
        unique_entities = []
        
        for entity in entities:
            key = (entity['text'].lower(), entity['label'])
            if key not in seen:
                seen.add(key)
                unique_entities.append(entity)
        
        return unique_entities

class SentimentAnalysisProcessor(NLPProcessor):
    def __init__(self, model_name: str = "cardiffnlp/twitter-roberta-base-sentiment-latest"):
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForSequenceClassification.from_pretrained(model_name)
        self.model.eval()
        
        # Label mapping
        self.label_mapping = {0: 'negative', 1: 'neutral', 2: 'positive'}
    
    async def process(self, document: TextDocument) -> Dict[str, Any]:
        """Analyze sentiment of text"""
        
        text = document.text
        
        # Handle long texts by chunking
        if len(text) > 512:
            sentiments = await self._analyze_chunks(text)
            overall_sentiment = self._aggregate_chunk_sentiments(sentiments)
        else:
            overall_sentiment = await self._analyze_single_text(text)
        
        return {
            'sentiment': overall_sentiment,
            'confidence': overall_sentiment.get('confidence', 0.0)
        }
    
    async def _analyze_single_text(self, text: str) -> Dict[str, Any]:
        """Analyze sentiment of single text"""
        
        # Tokenize
        inputs = self.tokenizer(
            text,
            truncation=True,
            padding=True,
            max_length=512,
            return_tensors='pt'
        )
        
        # Get predictions
        with torch.no_grad():
            outputs = self.model(**inputs)
            predictions = torch.nn.functional.softmax(outputs.logits, dim=-1)
        
        # Extract results
        scores = predictions[0].numpy()
        predicted_label_id = scores.argmax()
        predicted_label = self.label_mapping[predicted_label_id]
        confidence = float(scores[predicted_label_id])
        
        return {
            'label': predicted_label,
            'confidence': confidence,
            'scores': {
                self.label_mapping[i]: float(score)
                for i, score in enumerate(scores)
            }
        }
    
    async def _analyze_chunks(self, text: str) -> List[Dict]:
        """Analyze sentiment of text chunks"""
        
        # Split text into overlapping chunks
        chunk_size = 400
        overlap = 50
        chunks = []
        
        for i in range(0, len(text), chunk_size - overlap):
            chunk = text[i:i + chunk_size]
            if chunk.strip():
                chunks.append(chunk)
        
        # Analyze each chunk
        chunk_sentiments = []
        for chunk in chunks:
            sentiment = await self._analyze_single_text(chunk)
            chunk_sentiments.append(sentiment)
        
        return chunk_sentiments
    
    def _aggregate_chunk_sentiments(self, chunk_sentiments: List[Dict]) -> Dict[str, Any]:
        """Aggregate sentiment scores across chunks"""
        
        if not chunk_sentiments:
            return {'label': 'neutral', 'confidence': 0.0, 'scores': {}}
        
        # Weighted average based on confidence
        total_weight = sum(cs['confidence'] for cs in chunk_sentiments)
        
        if total_weight == 0:
            return {'label': 'neutral', 'confidence': 0.0, 'scores': {}}
        
        # Aggregate scores
        aggregated_scores = {}
        for label in ['negative', 'neutral', 'positive']:
            weighted_sum = sum(
                cs['scores'][label] * cs['confidence']
                for cs in chunk_sentiments
                if label in cs['scores']
            )
            aggregated_scores[label] = weighted_sum / total_weight
        
        # Determine overall label
        overall_label = max(aggregated_scores, key=aggregated_scores.get)
        overall_confidence = aggregated_scores[overall_label]
        
        return {
            'label': overall_label,
            'confidence': overall_confidence,
            'scores': aggregated_scores
        }

class TextClassificationProcessor(NLPProcessor):
    def __init__(self, 
                 model_name: str = "facebook/bart-large-mnli",
                 categories: List[str] = None):
        
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForSequenceClassification.from_pretrained(model_name)
        self.model.eval()
        
        self.categories = categories or [
            "business", "technology", "sports", "entertainment", 
            "politics", "health", "science"
        ]
    
    async def process(self, document: TextDocument) -> Dict[str, Any]:
        """Classify text into categories"""
        
        text = document.text
        
        # Zero-shot classification for each category
        category_scores = {}
        
        for category in self.categories:
            score = await self._classify_for_category(text, category)
            category_scores[category] = score
        
        # Sort categories by score
        sorted_categories = sorted(
            category_scores.items(),
            key=lambda x: x[1],
            reverse=True
        )
        
        return {
            'categories': [
                {'label': cat, 'score': score}
                for cat, score in sorted_categories
            ],
            'top_category': sorted_categories[0][0] if sorted_categories else None,
            'confidence': sorted_categories[0][1] if sorted_categories else 0.0
        }
    
    async def _classify_for_category(self, text: str, category: str) -> float:
        """Classify text for specific category using zero-shot"""
        
        # Create hypothesis for zero-shot classification
        hypothesis = f"This text is about {category}."
        
        # Tokenize premise and hypothesis
        inputs = self.tokenizer(
            text,
            hypothesis,
            truncation=True,
            padding=True,
            max_length=512,
            return_tensors='pt'
        )
        
        # Get prediction
        with torch.no_grad():
            outputs = self.model(**inputs)
            predictions = torch.nn.functional.softmax(outputs.logits, dim=-1)
        
        # Return entailment probability (indicates classification confidence)
        entailment_score = float(predictions[0][2])  # Entailment class
        
        return entailment_score

class TextEmbeddingProcessor(NLPProcessor):
    def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
        from sentence_transformers import SentenceTransformer
        self.model = SentenceTransformer(model_name)
        
    async def process(self, document: TextDocument) -> Dict[str, Any]:
        """Generate text embeddings"""
        
        text = document.text
        
        # Generate embedding
        embedding = self.model.encode(text, convert_to_tensor=True)
        
        return {
            'embedding': embedding,
            'embedding_dim': embedding.shape[0],
            'embedding_norm': float(torch.norm(embedding))
        }

Document Processing and Information Extraction

Advanced Document Processor

# document_processor.py
class DocumentProcessor:
    def __init__(self):
        self.parsers = {
            'pdf': PDFParser(),
            'docx': DocxParser(),
            'html': HTMLParser(),
            'txt': TxtParser()
        }
        
        self.extractors = {
            'tables': TableExtractor(),
            'images': ImageExtractor(),
            'forms': FormExtractor(),
            'structure': StructureExtractor()
        }
    
    async def process_document(self, 
                             file_path: str,
                             extract_options: Dict[str, bool] = None) -> Dict:
        """Process document and extract information"""
        
        extract_options = extract_options or {
            'text': True,
            'tables': True,
            'images': False,
            'forms': True,
            'structure': True
        }
        
        # Determine file type
        file_type = self._get_file_type(file_path)
        
        if file_type not in self.parsers:
            raise ValueError(f"Unsupported file type: {file_type}")
        
        # Parse document
        parser = self.parsers[file_type]
        parsed_doc = await parser.parse(file_path)
        
        # Extract information
        extraction_results = {}
        
        if extract_options.get('text', True):
            extraction_results['text'] = parsed_doc['text']
        
        if extract_options.get('tables', False):
            extraction_results['tables'] = await self.extractors['tables'].extract(parsed_doc)
        
        if extract_options.get('images', False):
            extraction_results['images'] = await self.extractors['images'].extract(parsed_doc)
        
        if extract_options.get('forms', False):
            extraction_results['forms'] = await self.extractors['forms'].extract(parsed_doc)
        
        if extract_options.get('structure', False):
            extraction_results['structure'] = await self.extractors['structure'].extract(parsed_doc)
        
        return {
            'file_path': file_path,
            'file_type': file_type,
            'metadata': parsed_doc.get('metadata', {}),
            'extracted_content': extraction_results
        }

class TableExtractor:
    def __init__(self):
        # Initialize table detection models
        pass
    
    async def extract(self, parsed_doc: Dict) -> List[Dict]:
        """Extract tables from document"""
        
        tables = []
        
        # Use multiple approaches for table detection
        if 'html' in parsed_doc:
            html_tables = self._extract_html_tables(parsed_doc['html'])
            tables.extend(html_tables)
        
        if 'text' in parsed_doc:
            text_tables = self._extract_text_tables(parsed_doc['text'])
            tables.extend(text_tables)
        
        # Clean and validate tables
        cleaned_tables = []
        for table in tables:
            if self._validate_table(table):
                cleaned_table = self._clean_table(table)
                cleaned_tables.append(cleaned_table)
        
        return cleaned_tables
    
    def _extract_html_tables(self, html_content: str) -> List[Dict]:
        """Extract tables from HTML content"""
        
        import pandas as pd
        from bs4 import BeautifulSoup
        
        tables = []
        
        try:
            # Use pandas for HTML table parsing
            dfs = pd.read_html(html_content)
            
            for i, df in enumerate(dfs):
                table = {
                    'id': f'html_table_{i}',
                    'type': 'html',
                    'headers': df.columns.tolist(),
                    'rows': df.values.tolist(),
                    'shape': df.shape
                }
                tables.append(table)
        
        except Exception as e:
            print(f"HTML table extraction error: {e}")
        
        return tables
    
    def _extract_text_tables(self, text: str) -> List[Dict]:
        """Extract tables from plain text using patterns"""
        
        tables = []
        
        # Look for table-like patterns
        lines = text.split('\n')
        potential_tables = []
        current_table = []
        
        for line in lines:
            # Check if line looks like a table row
            if self._is_table_row(line):
                current_table.append(line)
            else:
                if len(current_table) >= 3:  # Minimum table size
                    potential_tables.append(current_table)
                current_table = []
        
        # Add last table if exists
        if len(current_table) >= 3:
            potential_tables.append(current_table)
        
        # Parse potential tables
        for i, table_lines in enumerate(potential_tables):
            parsed_table = self._parse_text_table(table_lines)
            if parsed_table:
                parsed_table['id'] = f'text_table_{i}'
                parsed_table['type'] = 'text'
                tables.append(parsed_table)
        
        return tables
    
    def _is_table_row(self, line: str) -> bool:
        """Check if line looks like a table row"""
        
        # Look for common table separators
        separators = ['|', '\t', '  ', ',']
        
        for sep in separators:
            if line.count(sep) >= 2:  # At least 2 separators for 3 columns
                return True
        
        return False
    
    def _parse_text_table(self, lines: List[str]) -> Optional[Dict]:
        """Parse text lines into table structure"""
        
        if not lines:
            return None
        
        # Determine separator
        separator = self._detect_separator(lines)
        
        if not separator:
            return None
        
        # Parse rows
        rows = []
        for line in lines:
            cells = [cell.strip() for cell in line.split(separator)]
            if cells and any(cell for cell in cells):  # Non-empty row
                rows.append(cells)
        
        if len(rows) < 2:
            return None
        
        # First row as headers
        headers = rows[0]
        data_rows = rows[1:]
        
        return {
            'headers': headers,
            'rows': data_rows,
            'shape': (len(data_rows), len(headers)),
            'separator': separator
        }

class FormExtractor:
    def __init__(self):
        # Initialize form detection models
        pass
    
    async def extract(self, parsed_doc: Dict) -> List[Dict]:
        """Extract form fields and values from document"""
        
        forms = []
        
        text = parsed_doc.get('text', '')
        
        # Pattern-based form field detection
        form_patterns = [
            r'(\w+(?:\s+\w+)*)\s*[:]\s*([^\n]+)',  # Field: Value
            r'(\w+(?:\s+\w+)*)\s*[=]\s*([^\n]+)',  # Field = Value
            r'(\w+(?:\s+\w+)*)\s*[-]\s*([^\n]+)',  # Field - Value
        ]
        
        extracted_fields = []
        
        for pattern in form_patterns:
            matches = re.findall(pattern, text, re.IGNORECASE)
            
            for field_name, field_value in matches:
                field = {
                    'field_name': field_name.strip(),
                    'field_value': field_value.strip(),
                    'confidence': self._calculate_field_confidence(field_name, field_value)
                }
                
                if field['confidence'] > 0.5:  # Only include confident extractions
                    extracted_fields.append(field)
        
        # Group fields into logical forms
        if extracted_fields:
            forms.append({
                'id': 'extracted_form_0',
                'type': 'pattern_based',
                'fields': extracted_fields,
                'field_count': len(extracted_fields)
            })
        
        return forms
    
    def _calculate_field_confidence(self, field_name: str, field_value: str) -> float:
        """Calculate confidence score for extracted field"""
        
        confidence = 0.5  # Base confidence
        
        # Boost confidence for common field types
        common_fields = ['name', 'email', 'phone', 'address', 'date', 'amount']
        if any(common_field in field_name.lower() for common_field in common_fields):
            confidence += 0.2
        
        # Boost confidence for structured values
        if re.match(r'^\d{4}-\d{2}-\d{2}$', field_value):  # Date
            confidence += 0.2
        elif re.match(r'^[\w\.-]+@[\w\.-]+\.\w+$', field_value):  # Email
            confidence += 0.3
        elif re.match(r'^\+?\d[\d\s\-\(\)]+$', field_value):  # Phone
            confidence += 0.2
        
        # Penalize very short or very long values
        if len(field_value) < 2:
            confidence -= 0.3
        elif len(field_value) > 100:
            confidence -= 0.2
        
        return min(1.0, max(0.0, confidence))

Text Quality Assessment and Monitoring

Text Quality Analyzer

# text_quality.py
class TextQualityAnalyzer:
    def __init__(self):
        self.quality_metrics = [
            'readability',
            'coherence', 
            'completeness',
            'language_detection_confidence',
            'encoding_issues',
            'noise_level'
        ]
    
    def analyze_quality(self, text: str, metadata: Dict = None) -> Dict[str, float]:
        """Comprehensive text quality analysis"""
        
        metrics = {}
        
        # Basic text statistics
        stats = self._calculate_text_stats(text)
        
        # Readability metrics
        metrics['readability'] = self._calculate_readability(text, stats)
        
        # Coherence analysis
        metrics['coherence'] = self._analyze_coherence(text)
        
        # Completeness check
        metrics['completeness'] = self._check_completeness(text, stats)
        
        # Language detection confidence
        metrics['language_confidence'] = self._language_detection_confidence(text)
        
        # Encoding issues
        metrics['encoding_quality'] = self._check_encoding_quality(text)
        
        # Noise level
        metrics['noise_level'] = 1.0 - self._calculate_noise_level(text)
        
        # Overall quality score (weighted average)
        weights = {
            'readability': 0.2,
            'coherence': 0.3,
            'completeness': 0.2,
            'language_confidence': 0.1,
            'encoding_quality': 0.1,
            'noise_level': 0.1
        }
        
        overall_score = sum(
            metrics[metric] * weights[metric]
            for metric in weights
            if metric in metrics
        )
        
        metrics['overall_quality'] = overall_score
        
        return metrics
    
    def _calculate_text_stats(self, text: str) -> Dict:
        """Calculate basic text statistics"""
        
        words = text.split()
        sentences = re.split(r'[.!?]+', text)
        sentences = [s.strip() for s in sentences if s.strip()]
        
        return {
            'char_count': len(text),
            'word_count': len(words),
            'sentence_count': len(sentences),
            'avg_word_length': sum(len(word) for word in words) / len(words) if words else 0,
            'avg_sentence_length': len(words) / len(sentences) if sentences else 0
        }
    
    def _calculate_readability(self, text: str, stats: Dict) -> float:
        """Calculate readability score (simplified Flesch-Kincaid)"""
        
        if stats['sentence_count'] == 0 or stats['word_count'] == 0:
            return 0.0
        
        # Count syllables (approximation)
        syllable_count = self._estimate_syllables(text)
        
        # Flesch-Kincaid Grade Level formula
        fk_score = (0.39 * (stats['word_count'] / stats['sentence_count']) +
                   11.8 * (syllable_count / stats['word_count']) - 15.59)
        
        # Convert to 0-1 scale (higher is better, target grade level 8-10)
        target_grade = 9
        if fk_score <= target_grade:
            return 1.0 - abs(fk_score - target_grade) / target_grade
        else:
            return max(0.0, 1.0 - (fk_score - target_grade) / 10)
    
    def _estimate_syllables(self, text: str) -> int:
        """Estimate syllable count"""
        
        # Simple syllable counting approximation
        words = re.findall(r'\b\w+\b', text.lower())
        total_syllables = 0
        
        for word in words:
            syllables = len(re.findall(r'[aeiou]+', word))
            if word.endswith('e'):
                syllables -= 1
            if syllables == 0:
                syllables = 1
            total_syllables += syllables
        
        return total_syllables
    
    def _analyze_coherence(self, text: str) -> float:
        """Analyze text coherence using sentence similarity"""
        
        sentences = re.split(r'[.!?]+', text)
        sentences = [s.strip() for s in sentences if s.strip() and len(s) > 10]
        
        if len(sentences) < 2:
            return 1.0  # Single sentence is coherent
        
        try:
            from sentence_transformers import SentenceTransformer
            model = SentenceTransformer('all-MiniLM-L6-v2')
            
            # Get sentence embeddings
            embeddings = model.encode(sentences)
            
            # Calculate pairwise similarities
            similarities = []
            for i in range(len(embeddings) - 1):
                similarity = torch.cosine_similarity(
                    torch.tensor(embeddings[i]).unsqueeze(0),
                    torch.tensor(embeddings[i + 1]).unsqueeze(0)
                )[0].item()
                similarities.append(similarity)
            
            # Average similarity as coherence measure
            coherence_score = sum(similarities) / len(similarities)
            
            # Normalize to 0-1 range
            return max(0.0, min(1.0, (coherence_score + 1) / 2))
            
        except ImportError:
            # Fallback: simple keyword overlap
            return self._simple_coherence_measure(sentences)
    
    def _simple_coherence_measure(self, sentences: List[str]) -> float:
        """Simple coherence measure based on word overlap"""
        
        if len(sentences) < 2:
            return 1.0
        
        overlaps = []
        
        for i in range(len(sentences) - 1):
            words1 = set(sentences[i].lower().split())
            words2 = set(sentences[i + 1].lower().split())
            
            if len(words1.union(words2)) > 0:
                overlap = len(words1.intersection(words2)) / len(words1.union(words2))
                overlaps.append(overlap)
        
        return sum(overlaps) / len(overlaps) if overlaps else 0.0
    
    def _check_completeness(self, text: str, stats: Dict) -> float:
        """Check text completeness"""
        
        completeness_score = 1.0
        
        # Check for truncation indicators
        truncation_indicators = [
            '...', '[truncated]', '[continued]', '(more)', 'see more'
        ]
        
        text_lower = text.lower()
        for indicator in truncation_indicators:
            if indicator in text_lower:
                completeness_score -= 0.3
                break
        
        # Check for minimum length
        if stats['word_count'] < 10:
            completeness_score -= 0.4
        elif stats['word_count'] < 50:
            completeness_score -= 0.2
        
        # Check for proper ending
        if not text.rstrip().endswith(('.', '!', '?', ':', ';')):
            completeness_score -= 0.2
        
        return max(0.0, completeness_score)
    
    def _language_detection_confidence(self, text: str) -> float:
        """Get language detection confidence"""
        
        try:
            from langdetect import detect_langs
            
            lang_probs = detect_langs(text)
            if lang_probs:
                return float(lang_probs[0].prob)
            else:
                return 0.5
                
        except Exception:
            return 0.5  # Default confidence
    
    def _check_encoding_quality(self, text: str) -> float:
        """Check for encoding issues"""
        
        quality_score = 1.0
        
        # Check for common encoding artifacts
        encoding_issues = [
            '�',  # Replacement character
            '’',  # Incorrect UTF-8 decoding
            'á', 'é', 'í', 'ó', 'ú',  # Common encoding errors
        ]
        
        for issue in encoding_issues:
            if issue in text:
                quality_score -= 0.3
                break
        
        # Check for excessive non-printable characters
        non_printable = sum(1 for c in text if ord(c) < 32 and c not in '\n\r\t')
        if non_printable > len(text) * 0.01:  # More than 1% non-printable
            quality_score -= 0.4
        
        return max(0.0, quality_score)
    
    def _calculate_noise_level(self, text: str) -> float:
        """Calculate text noise level"""
        
        noise_indicators = 0
        total_indicators = 0
        
        # Check for excessive punctuation
        punctuation_ratio = sum(1 for c in text if c in '!@#$%^&*()_+{}[]|;:,.<>?') / len(text)
        noise_indicators += min(1.0, punctuation_ratio * 10)
        total_indicators += 1
        
        # Check for excessive uppercase
        uppercase_ratio = sum(1 for c in text if c.isupper()) / max(1, len(text))
        if uppercase_ratio > 0.3:  # More than 30% uppercase
            noise_indicators += min(1.0, uppercase_ratio)
        total_indicators += 1
        
        # Check for repetitive patterns
        words = text.split()
        if len(words) > 10:
            word_frequency = {}
            for word in words:
                word_frequency[word] = word_frequency.get(word, 0) + 1
            
            max_frequency = max(word_frequency.values())
            repetition_ratio = max_frequency / len(words)
            
            if repetition_ratio > 0.1:  # Same word appears more than 10% of the time
                noise_indicators += min(1.0, repetition_ratio * 5)
        
        total_indicators += 1
        
        return noise_indicators / total_indicators if total_indicators > 0 else 0.0

Production Deployment and Scaling

NLP Model Serving

# nlp_serving.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
import asyncio
import uvicorn
from typing import List, Optional
import logging

class TextProcessingRequest(BaseModel):
    texts: List[str]
    options: Optional[Dict[str, Any]] = None
    callback_url: Optional[str] = None

class ProcessingResponse(BaseModel):
    request_id: str
    status: str
    results: Optional[List[NLPResult]] = None
    error: Optional[str] = None

class NLPServingAPI:
    def __init__(self, pipeline: ProductionNLPPipeline):
        self.app = FastAPI(title="NLP Processing API")
        self.pipeline = pipeline
        self.request_queue = asyncio.Queue(maxsize=1000)
        self.results_store = {}
        self.setup_routes()
        
    def setup_routes(self):
        @self.app.post("/process", response_model=ProcessingResponse)
        async def process_texts(request: TextProcessingRequest):
            """Process texts synchronously"""
            
            try:
                # Convert texts to documents
                documents = [
                    TextDocument(
                        id=f"doc_{i}",
                        text=text,
                        metadata={'request_timestamp': time.time()}
                    )
                    for i, text in enumerate(request.texts)
                ]
                
                # Process documents
                results = await self.pipeline.process_batch(documents)
                
                return ProcessingResponse(
                    request_id=f"sync_{int(time.time())}",
                    status="completed",
                    results=results
                )
                
            except Exception as e:
                raise HTTPException(status_code=500, detail=str(e))
        
        @self.app.post("/process_async", response_model=Dict[str, str])
        async def process_texts_async(
            request: TextProcessingRequest,
            background_tasks: BackgroundTasks
        ):
            """Process texts asynchronously"""
            
            request_id = f"async_{int(time.time())}"
            
            # Add to background processing
            background_tasks.add_task(
                self._process_async,
                request_id,
                request.texts,
                request.options,
                request.callback_url
            )
            
            return {"request_id": request_id, "status": "queued"}
        
        @self.app.get("/status/{request_id}")
        async def get_processing_status(request_id: str):
            """Get processing status"""
            
            if request_id in self.results_store:
                return self.results_store[request_id]
            else:
                return {"request_id": request_id, "status": "not_found"}
        
        @self.app.get("/health")
        async def health_check():
            """Health check endpoint"""
            
            return {
                "status": "healthy",
                "queue_size": self.request_queue.qsize(),
                "performance_metrics": self.pipeline.performance_metrics
            }
    
    async def _process_async(self,
                           request_id: str,
                           texts: List[str],
                           options: Optional[Dict] = None,
                           callback_url: Optional[str] = None):
        """Process texts asynchronously"""
        
        try:
            # Store initial status
            self.results_store[request_id] = {
                "request_id": request_id,
                "status": "processing",
                "started_at": time.time()
            }
            
            # Convert texts to documents
            documents = [
                TextDocument(
                    id=f"doc_{i}",
                    text=text,
                    metadata={'request_id': request_id}
                )
                for i, text in enumerate(texts)
            ]
            
            # Process documents
            results = await self.pipeline.process_batch(documents)
            
            # Store results
            self.results_store[request_id] = {
                "request_id": request_id,
                "status": "completed",
                "results": results,
                "completed_at": time.time()
            }
            
            # Send callback if provided
            if callback_url:
                await self._send_callback(callback_url, self.results_store[request_id])
                
        except Exception as e:
            self.results_store[request_id] = {
                "request_id": request_id,
                "status": "error",
                "error": str(e),
                "failed_at": time.time()
            }
    
    async def _send_callback(self, callback_url: str, result: Dict):
        """Send callback notification"""
        
        try:
            import aiohttp
            
            async with aiohttp.ClientSession() as session:
                await session.post(callback_url, json=result)
                
        except Exception as e:
            logging.error(f"Callback failed: {e}")
    
    def start_server(self, host: str = "0.0.0.0", port: int = 8000):
        """Start the API server"""
        
        uvicorn.run(self.app, host=host, port=port)

Best Practices Checklist

  • Implement efficient text preprocessing pipelines
  • Use appropriate model sizes for deployment constraints
  • Implement proper error handling and fallbacks
  • Monitor text quality and model performance
  • Cache embeddings and model outputs when possible
  • Implement batch processing for throughput
  • Use async processing for long-running tasks
  • Regular model evaluation and retraining
  • Implement proper logging and monitoring
  • Handle multiple languages appropriately
  • Validate input text quality
  • Implement rate limiting and authentication
  • Use model quantization for edge deployment
  • Implement A/B testing for model improvements
  • Document processing capabilities and limitations

Conclusion

Production NLP systems require careful orchestration of preprocessing, model serving, quality assessment, and monitoring. By implementing robust text processing pipelines, efficient model serving architectures, and comprehensive quality monitoring, you can build NLP systems that perform reliably at scale. Remember that text data is inherently noisy and varied—building systems that gracefully handle this variability while maintaining consistent performance is key to success in production environments.

Share this article

DC

David Childs

Consulting Systems Engineer with over 10 years of experience building scalable infrastructure and helping organizations optimize their technology stack.

Related Articles