LangGraph-powered forensic analysis workflow for OSQuery data processing and security investigation.
This project provides a LangGraph-based workflow that orchestrates multiple AI-powered forensic analysis agents to automatically analyze OSQuery data for security investigations. The workflow runs multiple specialized analysis nodes in parallel, each focusing on different aspects of system forensics, then aggregates results into a comprehensive set of security reports.
📊 Conference Talk: Not-So-Secret-Agents-Deploying-AI-to-Optimize-Security-Operations.pdf
This presentation covers the architecture, implementation, and real-world deployment of AI agents for security operations optimization, including the forensics analysis workflow detailed in this repository.
- 🕸️ LangGraph Workflow: Orchestrated multi-agent analysis using LangGraph state management
- ⚡ Parallel Execution: All forensic analysis nodes run simultaneously for maximum efficiency
- 🔍 8 Specialized Nodes: Comprehensive coverage of system forensics domains
- 🤖 AI-Powered Analysis: Leverages OpenAI with LangChain integration
- 📊 Intelligent Aggregation: Synthesizes findings across all domains into actionable reports
- 🛠️ Production Ready: Built for enterprise security operations and incident response
- 🔒 Security Focused: Designed by security professionals for forensic investigations
START → prepare_input_data → [8 analysis nodes in parallel] → aggregate_results → [3 report nodes in parallel] → END
The workflow follows these phases:
- Data Preparation - Loads all OSQuery JSON files into shared state
- Parallel Analysis - Runs 8 forensic analysis nodes simultaneously
- Result Aggregation - Combines findings into comprehensive forensic report
- Specialized Report Generation - Creates 3 tailored reports in parallel:
- Quick Look Report - Rapid triage assessment for security analysts
- Executive Report - Business-focused briefing for leadership
- IOC Report - Machine-readable threat intelligence for defensive tools
# Clone the repository
git clone https://github.com/redcanaryco/osquery-forensics-agents.git
cd osquery-forensics-agents
# Install using uv (recommended)
uv sync
# Or install using pip
pip install -e .The project includes Jupyter notebook support for interactive forensic analysis. To set up Jupyter notebooks in VSCode:
# After running uv sync, register the kernel
uv run python -m ipykernel install --user --name osquery-forensics --display-name "OSQuery Forensics"
# Or if using the virtual environment directly
source .venv/bin/activate # On Windows: .venv\Scripts\activate
python -m ipykernel install --user --name osquery-forensics --display-name "OSQuery Forensics"VSCode Setup:
- Open a
.ipynbfile in VSCode - Click "Select Kernel" in the top right
- Choose "OSQuery Forensics" from the kernel list
- If not visible, select "Python Environments..." and choose the
.venvfolder
Verify Installation:
# Test in a notebook cell
import forensics_agents
print("OSQuery Forensics Agents ready!")The forensics agents support three LLM providers: OpenAI, Azure OpenAI, and Ollama (local). Create a .env file in the project root with your configuration:
# Required for OpenAI (default provider)
OPENAI_API_KEY=your-openai-api-key-here
# Optional OpenAI settings (with defaults)
OPENAI_MODEL=gpt-4o
OPENAI_ORGANIZATION=your-org-id-optional
DEFAULT_TEMPERATURE=0.1
MAX_RETRIES=3
TIMEOUT_SECONDS=60# use Azure OpenAI provider
LLM_PROVIDER=azure_openai
# Required for Azure OpenAI
AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/
AZURE_OPENAI_API_KEY=your-api-key-here
# Optional Azure OpenAI settings (with defaults)
AZURE_OPENAI_API_VERSION=2024-10-21
DEFAULT_MODEL=gpt-4o
DEFAULT_TEMPERATURE=0.1
MAX_RETRIES=3
TIMEOUT_SECONDS=60# Use Ollama provider
LLM_PROVIDER=ollama
# Required: Supported models are gpt-oss:20b or gpt-oss:120b
OLLAMA_MODEL=gpt-oss:20b
# Optional Ollama settings (with defaults)
OLLAMA_BASE_URL=http://localhost:11434
OLLAMA_TEMPERATURE=0.0
OLLAMA_TIMEOUT=300| Variable | Required | Default | Description |
|---|---|---|---|
AZURE_OPENAI_API_KEY |
Yes¹ | - | Azure OpenAI API key |
AZURE_OPENAI_ENDPOINT |
Yes¹ | - | Azure OpenAI resource endpoint |
OPENAI_API_KEY |
Yes² | - | OpenAI API key |
AZURE_OPENAI_API_VERSION |
No | 2024-10-21 |
Azure OpenAI API version |
COST_TRACKING_ENABLED |
No | true |
Enable/disable cost tracking |
COST_TRACKING_LOG_CALLS |
No | false |
Log individual LLM call costs |
DEFAULT_MODEL |
No | gpt-4o |
Azure OpenAI model name |
DEFAULT_TEMPERATURE |
No | 0.1 |
Model temperature (0.0-1.0) |
LLM_PROVIDER |
No | openai |
LLM provider: openai, azure_openai, or ollama |
MAX_RETRIES |
No | 3 |
Maximum API request retries |
OLLAMA_BASE_URL |
No | http://localhost:11434 |
Ollama server URL |
OLLAMA_MODEL |
No | gpt-oss:20b |
Ollama model (gpt-oss:20b or gpt-oss:120b) |
OLLAMA_TEMPERATURE |
No | 0.0 |
Ollama temperature setting |
OLLAMA_TIMEOUT |
No | 300 |
Ollama request timeout |
OPENAI_MODEL |
No | gpt-4o |
OpenAI model name |
OPENAI_ORGANIZATION |
No | - | OpenAI organization ID (optional) |
TIMEOUT_SECONDS |
No | 300 |
Request timeout in seconds |
¹Required only when LLM_PROVIDER=azure_openai
²Required only when LLM_PROVIDER=openai (default)
from forensics_agents import run_forensics_analysis
# Run complete forensics workflow
results = run_forensics_analysis('/path/to/osquery/data/')
# Access all available reports
print(results['final_report']) # Comprehensive forensics analysis
print(results['quick_look_report']) # Rapid triage assessment
print(results['executive_report']) # Business-focused briefing
print(results['ioc_report']) # Machine-readable threat intelligence
# Access individual analysis results
print(results['system_info_analysis'])
print(results['hardware_drivers_analysis'])from forensics_agents import build_forensics_graph
# Build workflow for custom execution
workflow = build_forensics_graph()
# Initialize state
initial_state = {
"directory": "/path/to/osquery/data/",
"file_data": {},
# ... other state fields
}
# Execute workflow
results = workflow.invoke(initial_state)
# Stream execution for real-time monitoring
for chunk in workflow.stream(initial_state):
print(f"Node completed: {list(chunk.keys())}")The LangGraph workflow can be visualized as a Mermaid diagram:
from forensics_agents import build_forensics_graph
from IPython.display import Image, display
workflow = build_forensics_graph()
display(Image(workflow.get_graph().draw_mermaid_png()))The workflow expects OSQuery JSON files in a directory structure:
investigation_data/
# System Information
├── system_info.json # Basic system information
├── os_version.json # Operating system version
├── uptime.json # System uptime
├── disk_info.json # Disk information
├── time.json # System time
├── bitlocker_info.json # BitLocker encryption status
├── windows_crashes.json # System crash information
# Hardware and Drivers
├── drivers.json # Non-Microsoft/unsigned drivers
# Users and Authentication
├── users.json # User accounts
├── user_groups.json # User-group mappings
├── logged_in_users.json # Current logon sessions
├── groups.json # System groups
├── logon_sessions.json # All logon sessions
# Network Configuration
├── arp_cache.json # ARP cache entries
├── interface_details.json # Network interface details
├── interface_addresses.json # Interface IP addresses
├── routes.json # Routing table
├── listening_ports.json # Listening network ports
├── process_open_sockets.json # Process network connections
├── firewall_rules.json # Windows firewall rules
├── firewall_profiles.json # Firewall profiles
├── etc_hosts.json # Hosts file entries
# Services and Processes
├── services.json # System services
├── processes.json # Running processes
├── startup_items.json # Startup programs
├── scheduled_tasks.json # Scheduled tasks
├── autoexec.json # Autoexec entries
# Software and Artifacts
├── programs.json # Installed programs
├── patches.json # Installed patches
├── muicache.json # MUI cache registry entries
├── userassist.json # UserAssist registry entries
├── chrome_extensions.json # Chrome extensions
# WMI and Persistence
├── wmi_script_event_consumers.json # WMI script event consumers
├── wmi_event_filters.json # WMI event filters
├── wmi_filter_consumer_binding.json # WMI filter-consumer bindings
├── wmi_cli_event_consumers.json # WMI CLI event consumers
# File System
├── shared_resources.json # Network shares
├── pipes.json # Named pipes
└── recycle_bin.json # Recycle bin contents
Use these OSQuery commands to generate the required data:
| OSQuery SQL Command | Output File |
|---|---|
SELECT * FROM system_info; |
system_info.json |
SELECT * FROM os_version; |
os_version.json |
SELECT * FROM uptime; |
uptime.json |
SELECT * FROM disk_info; |
disk_info.json |
SELECT * FROM time; |
time.json |
SELECT * FROM bitlocker_info; |
bitlocker_info.json |
SELECT * FROM windows_crashes; |
windows_crashes.json |
| OSQuery SQL Command | Output File |
|---|---|
SELECT device_name, device_id AS id, class, service_key FROM drivers WHERE NOT (provider = 'Microsoft' AND signed = 1) AND NOT (class = '' AND service_key = '') ORDER BY device_name ASC; |
drivers.json |
| OSQuery SQL Command | Output File |
|---|---|
SELECT * FROM users; |
users.json |
SELECT * FROM groups JOIN user_groups USING (gid) JOIN users using (uid); |
user_groups.json |
SELECT time, user, type, tty, pid FROM logged_in_users ORDER BY time; |
logged_in_users.json |
SELECT * FROM groups; |
groups.json |
SELECT * FROM logon_sessions; |
logon_sessions.json |
| OSQuery SQL Command | Output File |
|---|---|
SELECT * FROM arp_cache; |
arp_cache.json |
SELECT * FROM interface_details; |
interface_details.json |
SELECT * FROM interface_addresses; |
interface_addresses.json |
SELECT * FROM routes; |
routes.json |
SELECT * FROM listening_ports; |
listening_ports.json |
SELECT * FROM process_open_sockets; |
process_open_sockets.json |
SELECT * FROM win_firewall_rules; |
firewall_rules.json |
SELECT * FROM windows_firewall_profile; |
firewall_profiles.json |
SELECT * FROM etc_hosts; |
etc_hosts.json |
| OSQuery SQL Command | Output File |
|---|---|
SELECT * FROM services; |
services.json |
SELECT * FROM processes; |
processes.json |
SELECT * FROM startup_items; |
startup_items.json |
SELECT * FROM scheduled_tasks; |
scheduled_tasks.json |
SELECT * FROM autoexec; |
autoexec.json |
| OSQuery SQL Command | Output File |
|---|---|
SELECT * FROM programs; |
programs.json |
SELECT * FROM patches; |
patches.json |
SELECT key, path, name, data, mtime FROM registry WHERE key LIKE 'HKEY_USERS\\%\\Software\\Classes\\Local Settings\\Software\\Microsoft\\Windows\\Shell\\MuiCache%'; |
muicache.json |
SELECT key, path, name, data, mtime FROM registry WHERE key LIKE 'HKEY_USERS\\%\\Software\\Microsoft\\Windows\\CurrentVersion\\Explorer\\UserAssist%'; |
userassist.json |
SELECT * FROM chrome_extensions; |
chrome_extensions.json |
| OSQuery SQL Command | Output File |
|---|---|
SELECT * FROM wmi_script_event_consumers; |
wmi_script_event_consumers.json |
SELECT * FROM wmi_event_filters; |
wmi_event_filters.json |
SELECT * FROM wmi_filter_consumer_binding; |
wmi_filter_consumer_binding.json |
SELECT * FROM wmi_cli_event_consumers; |
wmi_cli_event_consumers.json |
| OSQuery SQL Command | Output File |
|---|---|
SELECT * FROM shared_resources; |
shared_resources.json |
SELECT * FROM pipes; |
pipes.json |
SELECT path, directory, filename, size, atime, mtime, ctime, btime FROM file WHERE path LIKE 'C:\\$Recycle.Bin%'; |
recycle_bin.json |
List Available Kernels:
jupyter kernelspec listRemove Kernel (if needed):
jupyter kernelspec remove osquery-forensicsReinstall Kernel:
uv run python -m ipykernel install --user --name osquery-forensics --display-name "OSQuery Forensics" --forceStart Jupyter Lab:
uv run jupyter labOr use VSCode with the Python extension:
- Install the Jupyter extension in VSCode
- Open any
.ipynbfile - Select the "OSQuery Forensics" kernel from the kernel picker
If the kernel doesn't appear in VSCode:
- Ensure the Python extension is installed
- Reload VSCode window (
Cmd/Ctrl + Shift + P→ "Developer: Reload Window") - Check that the kernel is registered:
jupyter kernelspec list - Try selecting "Python Environments..." and browse to the
.venvfolder
See the comprehensive example in examples/langgraph_forensics_workflow.ipynb which demonstrates:
- Workflow setup and configuration
- Sample data creation and analysis
- Result visualization and interpretation
- Advanced usage patterns
- Streaming execution monitoring
- Interactive Jupyter notebook usage
For interactive development and visualization of the forensics workflow, you can use LangGraph Studio:
Ensure you have the LangGraph CLI installed (included in project dependencies):
# Install project dependencies (includes langgraph-cli)
uv sync-
Start LangGraph Studio from the project root:
uv run langgraph dev
-
Access the Studio Interface:
- LangGraph Studio will start and automatically open in your browser
- The studio will load the workflow configuration from
langgraph.json
-
Using the Studio:
- Input Forensics Data Folder: Under the Input section in Langgraph Studio, specify the path to your forensics data folder. You can use the data directory in this project as an example
- Visualize Workflow: See the complete forensics analysis graph
- Interactive Execution: Run the workflow step-by-step with real data
- Debug Nodes: Inspect individual analysis nodes and their outputs
- Test Scenarios: Try different OSQuery data inputs
- Monitor State: Watch how data flows through the analysis pipeline
- Graph Visualization: Interactive view of all analysis nodes and their connections
- Live Execution: Run forensics analysis with immediate feedback
- State Inspector: Examine workflow state at each step
- Node Testing: Test individual analysis components
- Data Explorer: Browse OSQuery input data and analysis results
The LangGraph Studio configuration is defined in langgraph.json:
- Workflow entry points and execution flow
- Node definitions and dependencies
- Environment variable requirements
- Studio-specific display settings
This provides an ideal environment for developing new analysis nodes, testing forensics workflows, and understanding the complete analysis process.
osquery-forensics-agents/
├── src/forensics_agents/
│ ├── __init__.py # Main package exports
│ ├── config.py # Configuration management
│ ├── langgraph_workflow/ # LangGraph implementation
│ │ ├── __init__.py
│ │ ├── state.py # Workflow state definition
│ │ ├── nodes.py # Analysis node functions
│ │ ├── workflow.py # Graph construction
│ │ └── utils.py # Helper utilities
│ └── utils/ # Shared utilities
│ ├── __init__.py
│ └── data_loader.py
├── tests/ # Test suite
│ ├── __init__.py
│ └── test_config.py
├── examples/ # Usage examples
│ └── langgraph_forensics_workflow.ipynb
├── docs/ # Documentation
├── data/ # Sample data
├── images/ # Documentation images
│ ├── LangGraph-Workflow.png
│ └── OSQuery-Forensics-Mapping.png
├── LICENSE # License file
├── langgraph.json # LangGraph configuration
├── pyproject.toml # Project configuration
├── uv.lock # UV lock file
└── README.md
# Clone and setup
git clone https://github.com/redcanaryco/osquery-forensics-agents.git
cd osquery-forensics-agents
# Install development dependencies
uv sync --dev
# Run tests
pytest
# Code quality checks
ruff check .
mypy src/To add a new forensics analysis node:
- Define the analysis function in
src/forensics_agents/langgraph_workflow/nodes.py:
def analyze_new_domain(state: ForensicsAnalysisState) -> Dict[str, Any]:
"""Analyze new forensics domain"""
try:
file_data = state["file_data"]
llm = create_llm_client()
# Extract relevant data
new_data = file_data.get('new_domain_json_str')
# Create prompt and analyze
# ... analysis logic ...
return {"new_domain_analysis": response.content}
except Exception as e:
return handle_node_error("analyze_new_domain", e, state)- Update the workflow in
src/forensics_agents/langgraph_workflow/workflow.py:
# Add node to graph
forensics_graph.add_node("analyze_new_domain", analyze_new_domain)
# Connect to data preparation and aggregation
forensics_graph.add_edge("prepare_input_data", "analyze_new_domain")
forensics_graph.add_edge("analyze_new_domain", "aggregate_results")- Update the state model in
src/forensics_agents/langgraph_workflow/state.py:
class ForensicsAnalysisState(TypedDict):
# ... existing fields ...
new_domain_analysis: Optional[str]- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Make your changes following the LangGraph patterns
- Add tests for new functionality
- Ensure all tests pass (
pytest) - Ensure code quality checks pass (
ruff check . && mypy src/) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
The LangGraph workflow maximizes efficiency by running analysis nodes in parallel:
- Concurrent Analysis: All 8 forensic domains analyzed simultaneously
- Shared State: Efficient state management reduces memory overhead
- Error Isolation: Node failures don't impact other analyses
- Streaming Support: Real-time progress monitoring available
For large-scale deployments:
# Configure for high-throughput scenarios
import asyncio
async def analyze_multiple_endpoints(endpoint_data_paths):
tasks = []
for path in endpoint_data_paths:
task = asyncio.create_task(
asyncio.to_thread(run_forensics_analysis, path)
)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# Process 100+ endpoints concurrently
results = asyncio.run(analyze_multiple_endpoints(endpoint_paths))- API Keys: Store LLM credentials securely using environment variables or key vaults
- Data Privacy: OSQuery data may contain sensitive system information
- Network Security: Use secure connections for all API communications
- Audit Logging: All forensic analyses are logged with timestamps and error details
- Access Control: Implement appropriate RBAC for production deployments
This project is licensed under the BSD 3-Clause License - see the LICENSE file for details.
- 📖 Documentation: LangGraph Workflow Examples
- 🐛 Bug Reports: GitHub Issues
- 💬 Discussions: GitHub Discussions
- 📧 Contact: [email protected]
- LangGraph for the workflow orchestration framework
- LangChain for Azure OpenAI integration
- OSQuery for the excellent system introspection framework
- Red Canary's detection engineering team for forensics expertise
Built with ❤️ by the Red Canary team using LangGraph 🕸️

