# skills.ws — Full Skill Index > 83 agent skills for AI coding assistants. ## ab-testing (v1.0.0) Category: conversion Description: A/B test design, statistical analysis, sample size calculation, experiment prioritization, and results interpretation. Features: - Hypothesis generation frameworks - Sample size and duration calculators - Statistical significance analysis - Experiment prioritization (ICE, RICE, PIE) - Multi-variant test design - Results interpretation and documentation Use Cases: - Design an A/B test for a pricing page - Calculate required sample size for significance - Prioritize a backlog of experiment ideas - Interpret test results and make ship/no-ship decisions # A/B Testing ## Workflow ### 1. Hypothesis Generation **Format:** If we [change], then [metric] will [improve/decrease] by [amount], because [rationale]. **Example:** If we shorten the signup form from 5 fields to 3, then signup completion rate will increase by 15%, because friction reduction at high-intent moments increases conversion. ### 2. Prioritization **ICE framework (quick):** | Factor | Score 1-10 | Definition | |--------|-----------|------------| | Impact | How much will it move the metric? | | Confidence | How sure are we it'll work? | | Ease | How fast/cheap to implement? | | **ICE Score** | (I + C + E) / 3 | **RICE framework (more rigorous):** | Factor | Definition | |--------|-----------| | Reach | How many users affected per quarter? | | Impact | Expected effect size (0.25, 0.5, 1, 2, 3) | | Confidence | % sure (100%, 80%, 50%) | | Effort | Person-weeks to implement | | **RICE Score** | (R × I × C) / E | ### 3. Sample Size Calculation **Formula:** ``` n = (Z_α/2 × √(2p̄(1-p̄)) + Z_β × √(p₁(1-p₁) + p₂(1-p₂)))² / (p₂ - p₁)² Where: p₁ = baseline conversion rate p₂ = expected conversion rate (baseline × (1 + MDE)) p̄ = (p₁ + p₂) / 2 Z_α/2 = 1.96 (for 95% confidence) Z_β = 0.84 (for 80% power) ``` **Quick reference table:** | Baseline rate | MDE (relative) | Sample per variant | |--------------|----------------|-------------------| | 2% | 10% | 78,000 | | 2% | 20% | 20,000 | | 5% | 10% | 30,000 | | 5% | 20% | 7,700 | | 10% | 10% | 14,300 | | 10% | 20% | 3,700 | | 20% | 10% | 6,300 | | 20% | 20% | 1,600 | **Test duration:** ``` Days needed = (Sample per variant × 2) / Daily traffic to test page ``` Minimum: 7 days (capture day-of-week effects). Maximum: 4 weeks (avoid novelty decay). ### 4. Test Design **Rules:** - One hypothesis per test - Randomly assign users, not sessions (avoid flickering) - Use the same metric definition for control and variant - Define primary metric AND guardrail metrics before launch - Don't peek at results before reaching sample size **Guardrail metrics (always monitor):** - Page load time (variant shouldn't be slower) - Error rate - Revenue per user (don't increase signups but tank revenue) - Bounce rate ### 5. Statistical Analysis **Frequentist approach (standard):** ```python import numpy as np from scipy import stats # Results control = {'visitors': 5000, 'conversions': 250} # 5.0% variant = {'visitors': 5000, 'conversions': 295} # 5.9% p1 = control['conversions'] / control['visitors'] p2 = variant['conversions'] / variant['visitors'] p_pool = (control['conversions'] + variant['conversions']) / (control['visitors'] + variant['visitors']) se = np.sqrt(p_pool * (1 - p_pool) * (1/control['visitors'] + 1/variant['visitors'])) z = (p2 - p1) / se p_value = 2 * (1 - stats.norm.cdf(abs(z))) lift = (p2 - p1) / p1 * 100 ci_95 = 1.96 * np.sqrt(p1*(1-p1)/control['visitors'] + p2*(1-p2)/variant['visitors']) print(f"Control: {p1:.3%}") print(f"Variant: {p2:.3%}") print(f"Lift: {lift:.1f}%") print(f"95% CI: [{(p2-p1-ci_95)/p1*100:.1f}%, {(p2-p1+ci_95)/p1*100:.1f}%]") print(f"p-value: {p_value:.4f}") print(f"Significant: {'Yes' if p_value < 0.05 else 'No'}") ``` **Bayesian approach (when you want probability of being better):** ```python from scipy.stats import beta a_alpha = control['conversions'] + 1 a_beta = control['visitors'] - control['conversions'] + 1 b_alpha = variant['conversions'] + 1 b_beta = variant['visitors'] - variant['conversions'] + 1 # Monte Carlo simulation samples_a = beta.rvs(a_alpha, a_beta, size=100000) samples_b = beta.rvs(b_alpha, b_beta, size=100000) prob_b_better = (samples_b > samples_a).mean() print(f"P(variant > control): {prob_b_better:.1%}") ``` ### 6. Ship / No-Ship Decision | Scenario | Decision | |----------|----------| | p < 0.05 AND lift > MDE AND guardrails OK | Ship | | p < 0.05 AND lift > 0 but < MDE | Ship if no cost, otherwise iterate | | p > 0.05 AND lift direction positive | Inconclusive — extend or iterate | | p < 0.05 AND lift negative | Kill variant | | Guardrail metric degraded | Kill variant regardless of primary metric | ### 7. Documentation Template ```markdown ## Test: [Name] **Hypothesis:** If we [change], then [metric] will [change] by [amount] **Primary metric:** [metric name] **Guardrails:** [metric 1, metric 2] **Sample size:** [X per variant] **Duration:** [start] to [end] ### Results | Metric | Control | Variant | Lift | p-value | Sig? | |--------|---------|---------|------|---------|------| | Primary | X% | Y% | +Z% | 0.XX | Y/N | ### Decision: Ship / Kill / Iterate **Reasoning:** [Why] **Next test:** [What we learned and what to try next] ``` ## Common Mistakes - Stopping early because results "look significant" (peeking inflates false positives) - Running too many variants (splits traffic, takes forever to reach significance) - Testing tiny changes on low-traffic pages (will never reach significance) - Not segmenting results (variant might win overall but lose on mobile) - Ignoring practical significance (statistically significant 0.1% lift isn't worth shipping) ## accounting-finance (v1.0.0) Category: operations Description: Financial modeling, bookkeeping automation, invoicing workflows, tax compliance checklists, and P&L analysis for SMEs and startups. Features: - P&L statement analysis and generation - Cash flow forecasting models - Invoice automation workflows - Tax compliance checklists by jurisdiction - Revenue recognition patterns - Budget vs actual variance analysis Use Cases: - Build a monthly P&L analysis template - Set up automated invoicing workflows - Create a cash flow forecast model - Design a tax compliance checklist for EU SMEs # Accounting & Finance ## Workflow ### 1. P&L Structure | Line item | Calculation | Watch for | |-----------|-------------|-----------| | Revenue | MRR × months + one-time | Revenue recognition timing | | COGS | Hosting + support + onboarding | Should be < 30% of revenue for SaaS | | Gross margin | Revenue - COGS | Target: 70-80% for SaaS | | Operating expenses | Sales + Marketing + R&D + G&A | Break down by department | | EBITDA | Gross margin - OpEx | Profitability indicator | | Net income | EBITDA - interest - taxes - depreciation | Bottom line | **Monthly P&L review checklist:** - [ ] Revenue matches billing system (reconcile ±1%) - [ ] COGS categorized correctly (not mixed with OpEx) - [ ] Headcount costs allocated to correct department - [ ] One-time costs flagged and excluded from run-rate - [ ] MoM and YoY comparison included ### 2. Cash Flow Forecasting **13-week rolling forecast (the standard):** ``` Week | Starting cash | + Revenue collected | - Payroll | - Vendors | - Tax | = Ending cash 1 | 150,000 | 45,000 | 30,000 | 8,000 | 0 | 157,000 2 | 157,000 | 12,000 | 0 | 5,000 | 0 | 164,000 ... ``` **Key rules:** - Use cash collected, not revenue recognized - Payroll on actual pay dates (biweekly or monthly) - Include tax payments on due dates - Flag weeks where ending cash < 2 months of burn - Update weekly — stale forecasts are useless **Burn rate calculation:** ``` Monthly burn = Total cash spent in month (excluding one-time) Runway (months) = Current cash balance / Monthly burn ``` Runway < 6 months = fundraise or cut costs immediately. ### 3. Unit Economics | Metric | Formula | SaaS benchmark | |--------|---------|----------------| | CAC | Total sales & marketing spend / New customers | Varies by segment | | LTV | ARPU × Gross margin % × (1 / Monthly churn rate) | 3-5x CAC minimum | | LTV:CAC | LTV / CAC | > 3:1 healthy | | Payback period | CAC / (ARPU × Gross margin %) | < 12 months | | Magic number | Net new ARR / Prior quarter S&M spend | > 0.75 = efficient | ### 4. Invoice Automation **Invoice workflow:** 1. Contract signed → create invoice record 2. Invoice generated → send on billing date 3. Payment due → track aging (net 30/60) 4. Overdue → automated reminder sequence: - Day 1 past due: friendly reminder - Day 7: second notice with payment link - Day 14: escalation to account manager - Day 30: final notice, flag for collections **Invoice must include:** - Unique invoice number (sequential) - Your company legal name, address, VAT number - Client company name, address, VAT number - Line items with descriptions, quantities, unit prices - Subtotal, tax rate, tax amount, total - Payment terms and bank details - Issue date and due date ### 5. EU VAT Compliance | Scenario | VAT treatment | |----------|---------------| | B2B within same EU country | Charge local VAT | | B2B cross-border EU | Reverse charge (0% VAT, buyer reports) | | B2C within EU | Charge destination country VAT rate (OSS) | | B2C outside EU | No EU VAT | | B2B outside EU | No VAT (export) | **OSS (One-Stop Shop)** — register in one EU country, report all EU B2C sales there. **VAT rates (major markets):** | Country | Standard rate | |---------|-------------| | Luxembourg | 17% | | France | 20% | | Germany | 19% | | Netherlands | 21% | | Spain | 21% | | Italy | 22% | | Ireland | 23% | ### 6. Revenue Recognition (ASC 606 / IFRS 15) **5-step model:** 1. Identify the contract 2. Identify performance obligations 3. Determine transaction price 4. Allocate price to obligations 5. Recognize revenue when obligation is satisfied **SaaS specifics:** - Monthly subscription: recognize monthly as service delivered - Annual prepayment: recognize 1/12 each month (rest is deferred revenue) - Setup fees: defer and recognize over contract term (usually) - Usage-based: recognize as usage occurs ### 7. Budget vs Actual **Variance analysis template:** | Category | Budget | Actual | Variance | % Var | Flag | |----------|--------|--------|----------|-------|------| | Revenue | 100,000 | 95,000 | -5,000 | -5% | Review | | COGS | 25,000 | 23,000 | +2,000 | -8% | OK | | Marketing | 30,000 | 38,000 | -8,000 | +27% | Alert | | R&D | 40,000 | 41,000 | -1,000 | +3% | OK | **Rules:** - Flag variances > 10% for review - Flag variances > 20% for immediate action - Always explain WHY, not just WHAT - Reforecast quarterly based on actuals ## affiliate-marketing (v1.0.0) Category: growth Description: Affiliate program design, commission structures, partner recruitment, tracking implementation, and performance optimization. Features: - Affiliate program structure design - Commission model optimization (CPA, CPS, tiered) - Partner recruitment and onboarding - Tracking pixel and attribution setup - Affiliate content and creative guidelines - Performance reporting and payout automation Use Cases: - Launch an affiliate program from scratch - Design a tiered commission structure - Set up affiliate tracking with proper attribution - Recruit and onboard the first 50 affiliates # Affiliate Marketing ## Workflow ### 1. Program Structure **In-house vs network:** | Factor | In-house | Network (ShareASale, Impact, etc.) | |--------|----------|-----------------------------------| | Setup cost | Higher (build tracking) | Lower (platform fee) | | Commission fee | None (just payouts) | 20-30% on top of commission | | Control | Full | Limited by platform rules | | Recruitment | You do it all | Access to affiliate marketplace | | Tracking | Custom or SaaS (Rewardful, FirstPromoter) | Built-in | | Best for | SaaS, high-value products | E-commerce, consumer products | **Recommendation:** Start in-house with a SaaS tracker (Rewardful, PartnerStack, FirstPromoter). Move to network only if you need volume affiliate recruitment. ### 2. Commission Models | Model | Structure | Best for | Example | |-------|-----------|----------|---------| | CPA (Cost Per Acquisition) | Flat fee per signup/sale | SaaS free trials, lead gen | $50 per paid signup | | CPS (Cost Per Sale) | % of sale value | E-commerce, variable pricing | 20% of first purchase | | Recurring | % of subscription revenue | SaaS with monthly billing | 20% recurring for 12 months | | Tiered | Increasing % at volume thresholds | Motivating top performers | 20% (1-10), 25% (11-50), 30% (50+) | | Hybrid | Base CPA + recurring bonus | Balanced motivation | $25 CPA + 10% recurring | **Setting commission rates:** - Calculate your CAC from other channels - Set affiliate commission at 30-50% of your average CAC (profitable from day 1) - For SaaS: recurring commission should cap at 12 months (prevents perpetual liability) - Review rates quarterly based on affiliate-sourced LTV vs other channels ### 3. Tracking Implementation **Server-side tracking (recommended — survives ad blockers):** ```javascript // On referral click — store affiliate ID app.get('/ref/:affiliateId', (req, res) => { res.cookie('affiliate_id', req.params.affiliateId, { maxAge: 30 * 24 * 60 * 60 * 1000, // 30-day cookie httpOnly: true, secure: true, sameSite: 'lax' }); res.redirect('/'); }); // On conversion — attribute to affiliate app.post('/api/signup', async (req, res) => { const affiliateId = req.cookies.affiliate_id; if (affiliateId) { await recordConversion({ affiliateId, customerId: newUser.id, value: plan.price, type: 'signup' }); } }); ``` **Cookie window standards:** | Product type | Cookie window | Rationale | |-------------|--------------|-----------| | SaaS | 30-90 days | Longer consideration cycle | | E-commerce | 7-30 days | Shorter purchase cycle | | High-ticket | 90-180 days | Enterprise sales cycle | **Attribution rules:** - Last click wins (standard, simplest) - First click wins (rewards discovery, used by Amazon) - Linear (split credit) — complex, avoid unless needed - Direct traffic always overrides affiliate (prevent self-referral fraud) ### 4. Partner Recruitment **Ideal affiliate profiles:** | Type | Characteristics | Approach | |------|----------------|----------| | Content creators | Blog/YouTube in your niche | Outreach with free product + custom commission | | Review sites | G2, Capterra, niche review blogs | Ensure listing, offer affiliate tracking | | Influencers | Social following in target audience | Custom landing page + higher commission | | Existing customers | Happy users with audience | In-app referral prompt + affiliate upgrade option | | Agencies | Serve your target market | Reseller/referral hybrid program | **Recruitment outreach template:** ``` Subject: Partner with [Product] — [X]% commission Hi [Name], I've been following your content on [specific topic] — [genuine compliment]. We're building [Product], which helps [audience] with [value prop]. I think it'd be a natural fit for your audience. Our affiliate program: - [X]% recurring commission (or flat $X per signup) - [X]-day cookie window - Dedicated affiliate dashboard - Custom landing pages and creatives Interested in trying it out? Happy to set you up with a free account and walk through the program. [Name] ``` ### 5. Compliance **FTC disclosure requirements:** - Affiliates MUST disclose the relationship ("I earn a commission if you buy through my link") - Disclosure must be clear, conspicuous, and BEFORE the link - "Ad" or "Sponsored" labels on social media - Include disclosure guidelines in your affiliate agreement **Fraud prevention:** - Monitor for self-referrals (same IP for click and conversion) - Flag unusually high conversion rates (> 20% = suspicious) - Require minimum cookie age (> 1 second between click and conversion) - Ban coupon/deal sites from bidding on your brand keywords - Review top affiliates manually quarterly ### 6. Performance Optimization **Monthly affiliate dashboard:** | Metric | Calculate | Benchmark | |--------|-----------|-----------| | Active affiliates | Affiliates with ≥1 conversion/month | 10-20% of total | | Revenue per affiliate | Total affiliate revenue / Active affiliates | Track trend | | Conversion rate | Conversions / Clicks | 2-5% (depends on niche) | | EPC (Earnings Per Click) | Total commissions / Total clicks | $0.50-2.00 | | Average commission | Total paid / Total conversions | Track vs CAC | | Affiliate-sourced % | Affiliate revenue / Total revenue | 10-30% target | **Top performer strategy:** - Identify top 10% of affiliates by revenue - Offer exclusive commission rates (+5-10%) - Provide early access to new features for content - Quarterly check-in call with affiliate manager - Custom creatives and co-branded landing pages ## ai-agent-building (v1.0.0) Category: dev Description: Multi-agent orchestration — CrewAI, LangGraph, AutoGen patterns, tool use, memory systems, and production deployment. Features: - CrewAI agent and task configuration - LangGraph stateful workflow patterns - Tool use and function calling patterns - Memory systems: short-term, long-term, episodic - Multi-agent orchestration and delegation - Production deployment with observability Use Cases: - Build a multi-agent research pipeline - Create an agent with persistent memory - Orchestrate agents with LangGraph workflows - Deploy agents to production with monitoring --- name: ai-agent-building description: "Production AI agent development — LangGraph, CrewAI, tool design, memory, RAG pipelines, multi-agent patterns, evaluation, and safety." --- # AI Agent Building ## Agent Architecture Fundamentals An AI agent is an LLM that can take actions. That's it. Everything else is engineering around that core loop: ``` Observe → Think → Act → Observe → Think → Act → ... ``` The complexity comes from: which actions? how to recover from failures? how to know when to stop? how to not bankrupt you on API calls? --- ## LangGraph: State Machine Agents LangGraph is the production-grade choice for complex agents. It gives you explicit control flow, checkpointing, and human-in-the-loop — things you need in production but that simple chains don't offer. ### Basic Agent with Tool Calling ```python # pip install langgraph langchain-openai langgraph-checkpoint-sqlite from typing import Annotated, TypedDict from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages from langgraph.prebuilt import ToolNode from langchain_openai import ChatOpenAI from langchain_core.tools import tool # Define state class AgentState(TypedDict): messages: Annotated[list, add_messages] # Define tools @tool def search_database(query: str) -> str: """Search the product database for items matching the query.""" # Real implementation here return f"Found 3 products matching '{query}': Widget A ($10), Widget B ($20), Widget C ($30)" @tool def create_order(product_name: str, quantity: int) -> str: """Create an order for a product.""" order_id = f"ORD-{hash(product_name) % 10000:04d}" return f"Order {order_id} created: {quantity}x {product_name}" tools = [search_database, create_order] model = ChatOpenAI(model="gpt-4o", temperature=0).bind_tools(tools) # Define nodes def agent(state: AgentState) -> AgentState: response = model.invoke(state["messages"]) return {"messages": [response]} def should_continue(state: AgentState) -> str: last_message = state["messages"][-1] if last_message.tool_calls: return "tools" return END # Build graph graph = StateGraph(AgentState) graph.add_node("agent", agent) graph.add_node("tools", ToolNode(tools)) graph.add_edge(START, "agent") graph.add_conditional_edges("agent", should_continue, {"tools": "tools", END: END}) graph.add_edge("tools", "agent") app = graph.compile() # Run result = app.invoke({ "messages": [{"role": "user", "content": "Find me a widget under $15 and order 2 of them"}] }) ``` ### Human-in-the-Loop with Checkpointing ```python from langgraph.checkpoint.sqlite import SqliteSaver from langgraph.graph import StateGraph, START, END class AgentState(TypedDict): messages: Annotated[list, add_messages] pending_approval: bool def agent(state: AgentState) -> AgentState: response = model.invoke(state["messages"]) return {"messages": [response]} def check_approval_needed(state: AgentState) -> str: last = state["messages"][-1] if last.tool_calls: # Require approval for order creation for tc in last.tool_calls: if tc["name"] == "create_order": return "needs_approval" return "tools" return END def request_approval(state: AgentState) -> AgentState: """Interrupt execution — human must approve before continuing.""" return {"pending_approval": True} # Build with interrupt graph = StateGraph(AgentState) graph.add_node("agent", agent) graph.add_node("tools", ToolNode(tools)) graph.add_node("approval", request_approval) graph.add_edge(START, "agent") graph.add_conditional_edges("agent", check_approval_needed, { "tools": "tools", "needs_approval": "approval", END: END, }) graph.add_edge("tools", "agent") graph.add_edge("approval", "tools") # After approval, execute the tool # Compile with checkpointing memory = SqliteSaver.from_conn_string(":memory:") app = graph.compile(checkpointer=memory, interrupt_before=["approval"]) # First run — stops at approval node config = {"configurable": {"thread_id": "order-123"}} result = app.invoke( {"messages": [{"role": "user", "content": "Order 5 Widget As"}]}, config=config, ) # State is saved. Agent is paused. # Human approves — resume from checkpoint result = app.invoke(None, config=config) # Continues from where it left off ``` ### TypeScript LangGraph ```typescript import { StateGraph, START, END, Annotation } from "@langchain/langgraph"; import { ChatOpenAI } from "@langchain/openai"; import { ToolNode } from "@langchain/langgraph/prebuilt"; import { tool } from "@langchain/core/tools"; import { z } from "zod"; import { BaseMessage, HumanMessage } from "@langchain/core/messages"; // State definition const AgentState = Annotation.Root({ messages: Annotation({ reducer: (prev, next) => [...prev, ...next], }), }); // Tools const searchTool = tool( async ({ query }) => { return `Results for "${query}": Product A, Product B`; }, { name: "search", description: "Search the product database", schema: z.object({ query: z.string() }), } ); const model = new ChatOpenAI({ model: "gpt-4o", temperature: 0 }).bindTools([searchTool]); // Nodes async function agent(state: typeof AgentState.State) { const response = await model.invoke(state.messages); return { messages: [response] }; } function shouldContinue(state: typeof AgentState.State) { const lastMsg = state.messages[state.messages.length - 1]; if ("tool_calls" in lastMsg && lastMsg.tool_calls?.length) { return "tools"; } return END; } // Graph const graph = new StateGraph(AgentState) .addNode("agent", agent) .addNode("tools", new ToolNode([searchTool])) .addEdge(START, "agent") .addConditionalEdges("agent", shouldContinue, { tools: "tools", [END]: END }) .addEdge("tools", "agent"); const app = graph.compile(); const result = await app.invoke({ messages: [new HumanMessage("Find products related to widgets")], }); ``` --- ## CrewAI: Multi-Agent Teams ```python # pip install crewai crewai-tools from crewai import Agent, Task, Crew, Process from crewai_tools import SerperDevTool, ScrapeWebsiteTool # Define specialized agents researcher = Agent( role="Senior Research Analyst", goal="Find comprehensive, accurate information about the given topic", backstory="You're a seasoned researcher with 15 years of experience in market analysis.", tools=[SerperDevTool(), ScrapeWebsiteTool()], verbose=True, allow_delegation=False, llm="gpt-4o", ) writer = Agent( role="Technical Writer", goal="Create clear, engaging content based on research findings", backstory="You're a technical writer who excels at making complex topics accessible.", verbose=True, llm="gpt-4o", ) editor = Agent( role="Editor", goal="Review and polish the content for accuracy, clarity, and engagement", backstory="You're a meticulous editor with an eye for detail and factual accuracy.", verbose=True, llm="gpt-4o", ) # Define tasks research_task = Task( description="Research the current state of {topic}. Find key trends, statistics, and expert opinions.", expected_output="A comprehensive research brief with key findings, statistics, and sources.", agent=researcher, ) writing_task = Task( description="Write a 1500-word article based on the research brief.", expected_output="A well-structured article with introduction, key sections, and conclusion.", agent=writer, context=[research_task], # Uses output from research ) editing_task = Task( description="Edit the article for clarity, accuracy, and engagement. Fix any factual errors.", expected_output="A polished, publication-ready article.", agent=editor, context=[writing_task], ) # Assemble crew crew = Crew( agents=[researcher, writer, editor], tasks=[research_task, writing_task, editing_task], process=Process.sequential, # or Process.hierarchical with a manager verbose=True, ) result = crew.kickoff(inputs={"topic": "AI agents in production"}) ``` --- ## Tool Design: Best Practices ### Error Recovery and Timeout Handling ```python import asyncio from functools import wraps from langchain_core.tools import tool def with_timeout(seconds: int = 30): def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): try: return await asyncio.wait_for(func(*args, **kwargs), timeout=seconds) except asyncio.TimeoutError: return f"Error: Tool timed out after {seconds}s. Try a simpler query." return wrapper return decorator def with_retry(max_retries: int = 3): def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): last_error = None for attempt in range(max_retries): try: return await func(*args, **kwargs) except Exception as e: last_error = e if attempt < max_retries - 1: await asyncio.sleep(2 ** attempt) return f"Error after {max_retries} retries: {str(last_error)}" return wrapper return decorator @tool @with_retry(3) @with_timeout(30) async def query_database(sql: str) -> str: """Execute a read-only SQL query against the analytics database. Args: sql: A SELECT query. Must not contain INSERT, UPDATE, DELETE, or DROP. """ # Validate — never let an LLM run arbitrary SQL forbidden = ["INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "TRUNCATE"] if any(word in sql.upper() for word in forbidden): return "Error: Only SELECT queries are allowed." result = await db.execute(sql) if len(result) > 100: return f"Query returned {len(result)} rows. Showing first 20:\n{format_rows(result[:20])}" return format_rows(result) ``` ### Tool Design Rules 1. **Clear descriptions** — the LLM reads them to decide when to use the tool 2. **Validate inputs** — never trust LLM-generated parameters 3. **Return errors as strings** — don't throw exceptions, let the agent recover 4. **Limit output size** — truncate large results, the context window is precious 5. **Make tools idempotent** where possible — agents retry 6. **Include examples in docstrings** — helps the LLM use tools correctly --- ## Memory Patterns ### Conversation Buffer with Sliding Window ```python from langchain_core.messages import trim_messages # Keep last N messages, but always keep the system message trimmer = trim_messages( max_tokens=4000, strategy="last", token_counter=model, include_system=True, allow_partial=False, ) # In your agent node def agent(state: AgentState) -> AgentState: trimmed = trimmer.invoke(state["messages"]) response = model.invoke(trimmed) return {"messages": [response]} ``` ### Summary Memory ```python from langchain_core.messages import SystemMessage async def maybe_summarize(state: AgentState) -> AgentState: messages = state["messages"] if len(messages) < 20: return state # Summarize older messages, keep recent ones old_messages = messages[1:-10] # Skip system, keep last 10 recent = messages[-10:] summary = await model.ainvoke([ SystemMessage(content="Summarize this conversation concisely, preserving key facts and decisions:"), *old_messages, ]) return { "messages": [ messages[0], # System message SystemMessage(content=f"Previous conversation summary: {summary.content}"), *recent, ] } ``` ### Vector Store Memory (Long-term) ```python from langchain_openai import OpenAIEmbeddings from langchain_community.vectorstores import Chroma embeddings = OpenAIEmbeddings(model="text-embedding-3-small") memory_store = Chroma( collection_name="agent_memory", embedding_function=embeddings, persist_directory="./memory_db", ) @tool def recall_memory(query: str) -> str: """Search past conversations and learned facts for relevant information.""" docs = memory_store.similarity_search(query, k=5) if not docs: return "No relevant memories found." return "\n\n".join([ f"[{doc.metadata.get('timestamp', 'unknown')}] {doc.page_content}" for doc in docs ]) @tool def store_memory(fact: str, category: str = "general") -> str: """Store an important fact or learning for future reference.""" memory_store.add_texts( texts=[fact], metadatas=[{ "category": category, "timestamp": datetime.now().isoformat(), }], ) return f"Stored: {fact}" ``` --- ## RAG Pipeline: Production Patterns ### Chunking Strategies ```python from langchain_text_splitters import RecursiveCharacterTextSplitter # For general documents splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200, separators=["\n\n", "\n", ". ", " ", ""], length_function=len, ) # For code code_splitter = RecursiveCharacterTextSplitter.from_language( language=Language.PYTHON, chunk_size=1500, chunk_overlap=200, ) # For markdown with structure preservation markdown_splitter = RecursiveCharacterTextSplitter.from_language( language=Language.MARKDOWN, chunk_size=1000, chunk_overlap=100, ) ``` ### Hybrid Search (Vector + Keyword) ```python from langchain_community.retrievers import BM25Retriever from langchain.retrievers import EnsembleRetriever # Vector search (semantic) vector_retriever = vector_store.as_retriever(search_kwargs={"k": 5}) # Keyword search (BM25) bm25_retriever = BM25Retriever.from_documents(documents, k=5) # Combine with weights hybrid_retriever = EnsembleRetriever( retrievers=[vector_retriever, bm25_retriever], weights=[0.6, 0.4], # Favor semantic, but keyword catches exact matches ) ``` ### Reranking ```python from langchain.retrievers import ContextualCompressionRetriever from langchain_cohere import CohereRerank # Retrieve broadly, then rerank for precision reranker = CohereRerank(model="rerank-english-v3.0", top_n=3) retriever = ContextualCompressionRetriever( base_compressor=reranker, base_retriever=hybrid_retriever, # Gets 20 candidates ) # Usage: retriever.invoke("How do I configure CORS?") # Returns top 3 most relevant chunks from the initial 20 ``` ### Citation Pattern ```python from langchain_core.prompts import ChatPromptTemplate RAG_PROMPT = ChatPromptTemplate.from_messages([ ("system", """Answer the question based on the provided context. Include citations using [1], [2] etc. referencing the source documents. If the context doesn't contain the answer, say so — don't make things up. Context: {context}"""), ("human", "{question}"), ]) def format_docs_with_citations(docs): formatted = [] for i, doc in enumerate(docs, 1): source = doc.metadata.get("source", "unknown") formatted.append(f"[{i}] (Source: {source})\n{doc.page_content}") return "\n\n".join(formatted) ``` --- ## Multi-Agent Patterns ### Supervisor Pattern ```python from langgraph.graph import StateGraph, START, END class SupervisorState(TypedDict): messages: Annotated[list, add_messages] next_agent: str def supervisor(state: SupervisorState) -> SupervisorState: """Route to the appropriate specialist agent.""" response = supervisor_model.invoke([ SystemMessage(content="""You are a supervisor routing tasks to specialists: - researcher: for finding information - coder: for writing or reviewing code - writer: for creating content Respond with JSON: {"next": "agent_name"} or {"next": "FINISH"}"""), *state["messages"], ]) decision = json.loads(response.content) return {"next_agent": decision["next"]} def route(state: SupervisorState) -> str: return state["next_agent"] graph = StateGraph(SupervisorState) graph.add_node("supervisor", supervisor) graph.add_node("researcher", researcher_agent) graph.add_node("coder", coder_agent) graph.add_node("writer", writer_agent) graph.add_edge(START, "supervisor") graph.add_conditional_edges("supervisor", route, { "researcher": "researcher", "coder": "coder", "writer": "writer", "FINISH": END, }) # All agents report back to supervisor for agent in ["researcher", "coder", "writer"]: graph.add_edge(agent, "supervisor") app = graph.compile() ``` --- ## Production Concerns ### Cost Tracking ```python import tiktoken from contextlib import contextmanager class CostTracker: PRICES = { # per 1M tokens, as of 2024 "gpt-4o": {"input": 2.50, "output": 10.00}, "gpt-4o-mini": {"input": 0.15, "output": 0.60}, "claude-3-5-sonnet": {"input": 3.00, "output": 15.00}, } def __init__(self): self.total_input_tokens = 0 self.total_output_tokens = 0 self.total_cost = 0.0 self.calls = [] def track(self, model: str, input_tokens: int, output_tokens: int): prices = self.PRICES.get(model, {"input": 0, "output": 0}) cost = (input_tokens * prices["input"] + output_tokens * prices["output"]) / 1_000_000 self.total_input_tokens += input_tokens self.total_output_tokens += output_tokens self.total_cost += cost self.calls.append({"model": model, "input": input_tokens, "output": output_tokens, "cost": cost}) def report(self) -> str: return ( f"Total: {len(self.calls)} calls, " f"{self.total_input_tokens} input + {self.total_output_tokens} output tokens, " f"${self.total_cost:.4f}" ) ``` ### Streaming Responses ```python # LangGraph streaming async for event in app.astream_events( {"messages": [HumanMessage(content="Hello")]}, version="v2", ): if event["event"] == "on_chat_model_stream": chunk = event["data"]["chunk"] print(chunk.content, end="", flush=True) elif event["event"] == "on_tool_start": print(f"\n[Using tool: {event['name']}]") ``` ### Fallback Models ```python from langchain_openai import ChatOpenAI from langchain_anthropic import ChatAnthropic primary = ChatOpenAI(model="gpt-4o", timeout=30) fallback = ChatAnthropic(model="claude-3-5-sonnet-20241022", timeout=30) model = primary.with_fallbacks([fallback]) # Automatically tries fallback if primary fails ``` --- ## Safety: Prompt Injection Defense ### Input Validation ```python import re def sanitize_user_input(text: str) -> str: """Basic prompt injection defense.""" # Remove common injection patterns suspicious_patterns = [ r"ignore (?:all )?(?:previous |prior |above )?instructions", r"you are now", r"new instructions:", r"system prompt:", r"|<\|im_end\|>|<\|endoftext\|>", ] for pattern in suspicious_patterns: if re.search(pattern, text, re.IGNORECASE): return "[Input contained suspicious patterns and was filtered]" return text ``` ### Output Validation ```python from pydantic import BaseModel, field_validator class AgentResponse(BaseModel): answer: str sources: list[str] confidence: float @field_validator("answer") @classmethod def no_system_leaks(cls, v: str) -> str: forbidden = ["system prompt", "you are an AI", "as an AI language model"] for phrase in forbidden: if phrase.lower() in v.lower(): raise ValueError("Response contained forbidden content") return v @field_validator("confidence") @classmethod def valid_range(cls, v: float) -> float: if not 0 <= v <= 1: raise ValueError("Confidence must be between 0 and 1") return v ``` --- ## Evaluation ### LLM-as-Judge ```python EVAL_PROMPT = """Rate the following AI response on a scale of 1-5: Question: {question} Response: {response} Reference Answer: {reference} Criteria: - Accuracy (does it match the reference?) - Completeness (does it cover all key points?) - Clarity (is it well-written and easy to understand?) Respond with JSON: {"accuracy": N, "completeness": N, "clarity": N, "reasoning": "..."}""" async def evaluate_response(question: str, response: str, reference: str) -> dict: result = await eval_model.ainvoke( EVAL_PROMPT.format(question=question, response=response, reference=reference) ) return json.loads(result.content) # Run evaluation suite async def run_eval_suite(agent, test_cases: list[dict]) -> dict: results = [] for case in test_cases: response = await agent.ainvoke({"messages": [HumanMessage(content=case["question"])]}) answer = response["messages"][-1].content score = await evaluate_response(case["question"], answer, case["expected"]) results.append({"case": case["question"], "score": score}) avg_accuracy = sum(r["score"]["accuracy"] for r in results) / len(results) avg_completeness = sum(r["score"]["completeness"] for r in results) / len(results) return {"results": results, "avg_accuracy": avg_accuracy, "avg_completeness": avg_completeness} ``` ### Regression Testing ```python # tests/test_agent.py import pytest REGRESSION_CASES = [ { "input": "What's the refund policy?", "must_contain": ["30 days", "full refund"], "must_not_contain": ["no refunds"], }, { "input": "How do I cancel my subscription?", "must_contain": ["settings", "billing"], "must_use_tools": ["search_knowledge_base"], }, ] @pytest.mark.parametrize("case", REGRESSION_CASES) async def test_agent_regression(case): result = await agent.ainvoke({"messages": [HumanMessage(content=case["input"])]}) answer = result["messages"][-1].content.lower() for phrase in case.get("must_contain", []): assert phrase.lower() in answer, f"Missing: {phrase}" for phrase in case.get("must_not_contain", []): assert phrase.lower() not in answer, f"Should not contain: {phrase}" ``` --- ## Checklist: Production Agent - [ ] Tools have clear descriptions, input validation, and error handling - [ ] Timeouts on all tool calls and LLM invocations - [ ] Cost tracking per conversation/user - [ ] Fallback models configured - [ ] Streaming for user-facing responses - [ ] Conversation memory with size limits - [ ] Prompt injection defense (input sanitization) - [ ] Output validation (no system prompt leaks) - [ ] Human-in-the-loop for high-stakes actions - [ ] Checkpointing for long-running workflows - [ ] Evaluation suite with regression tests - [ ] Token usage monitoring and alerts - [ ] Rate limiting per user - [ ] Logging of all tool calls and responses - [ ] Graceful degradation when tools fail --- ## MCP (Model Context Protocol) Integration MCP is the standard for connecting agents to external tools. Instead of hardcoding tool implementations, agents connect to MCP servers that expose tools over a standardized protocol. ### Building an MCP Server ```typescript // mcp-server.ts — expose tools for any MCP-compatible agent import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; import express from 'express'; const server = new McpServer({ name: 'my-tools', version: '1.0.0' }); // Register tools with typed parameters server.tool('search_docs', 'Search internal documentation by query', { query: { type: 'string', description: 'Search query' }, limit: { type: 'number', description: 'Max results (default 10)' }, }, async ({ query, limit = 10 }) => { const results = await searchIndex(query, limit); return { content: [{ type: 'text', text: JSON.stringify(results, null, 2) }], }; }); server.tool('create_ticket', 'Create a support ticket in Jira', { title: { type: 'string', description: 'Ticket title' }, priority: { type: 'string', description: 'low | medium | high | critical' }, description: { type: 'string', description: 'Detailed description' }, }, async ({ title, priority, description }) => { // Validate before acting — agents will pass garbage sometimes if (!['low', 'medium', 'high', 'critical'].includes(priority)) { throw new Error(`Invalid priority "${priority}". Must be: low, medium, high, critical`); } const ticket = await jira.createIssue({ summary: title, priority, description }); return { content: [{ type: 'text', text: `Created ticket ${ticket.key}: ${ticket.self}` }], }; }); // Streamable HTTP transport (replaces deprecated SSE transport) const app = express(); app.use(express.json()); app.post('/mcp', async (req, res) => { const transport = new StreamableHTTPServerTransport({ sessionIdGenerator: undefined, // stateless }); await server.connect(transport); await transport.handleRequest(req, res); }); app.listen(3100, () => console.log('MCP server on :3100')); ``` ### Connecting LangGraph to MCP Tools ```python # Use MCP tools inside a LangGraph agent import asyncio from mcp import ClientSession, sse_client async def get_mcp_tools(server_url: str) -> list: """Fetch tool definitions from an MCP server and convert to LangChain tools.""" async with sse_client(server_url) as (read, write): async with ClientSession(read, write) as session: await session.initialize() mcp_tools = await session.list_tools() langchain_tools = [] for tool in mcp_tools.tools: # Create a closure for each tool async def call_tool(name=tool.name, **kwargs): async with sse_client(server_url) as (r, w): async with ClientSession(r, w) as s: await s.initialize() result = await s.call_tool(name, kwargs) return result.content[0].text langchain_tools.append(StructuredTool( name=tool.name, description=tool.description, func=call_tool, args_schema=create_schema_from_json(tool.inputSchema), )) return langchain_tools ``` --- ## Deployment: Containerized Agent ```dockerfile # Dockerfile — production agent with health checks FROM python:3.12-slim AS base RUN pip install --no-cache-dir langgraph langchain-openai redis uvicorn fastapi WORKDIR /app COPY . . # Non-root user RUN useradd -m agent && chown -R agent:agent /app USER agent HEALTHCHECK --interval=30s --timeout=5s --retries=3 \ CMD curl -f http://localhost:8000/health || exit 1 EXPOSE 8000 CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8000"] ``` ```python # server.py — FastAPI wrapper with streaming, cost tracking, rate limiting from fastapi import FastAPI, Request, HTTPException from fastapi.responses import StreamingResponse from collections import defaultdict import time, tiktoken app = FastAPI() enc = tiktoken.encoding_for_model("gpt-4o") # In-memory rate limiter (use Redis in production) request_counts: dict[str, list[float]] = defaultdict(list) RATE_LIMIT = 20 # requests per minute @app.middleware("http") async def rate_limit(request: Request, call_next): api_key = request.headers.get("x-api-key", "anonymous") now = time.time() request_counts[api_key] = [t for t in request_counts[api_key] if now - t < 60] if len(request_counts[api_key]) >= RATE_LIMIT: raise HTTPException(429, "Rate limit exceeded") request_counts[api_key].append(now) return await call_next(request) @app.post("/chat") async def chat(request: Request): body = await request.json() user_msg = body["message"] api_key = request.headers.get("x-api-key") # Token counting for cost tracking input_tokens = len(enc.encode(user_msg)) async def stream(): total_output_tokens = 0 async for event in agent.astream_events( {"messages": [HumanMessage(content=user_msg)]}, version="v2", ): if event["event"] == "on_chat_model_stream": chunk = event["data"]["chunk"].content if chunk: total_output_tokens += len(enc.encode(chunk)) yield f"data: {json.dumps({'text': chunk})}\n\n" # Log cost (GPT-4o pricing: $2.50/1M input, $10/1M output) cost = (input_tokens * 2.50 + total_output_tokens * 10.0) / 1_000_000 yield f"data: {json.dumps({'done': True, 'tokens': {'in': input_tokens, 'out': total_output_tokens}, 'cost_usd': round(cost, 6)})}\n\n" return StreamingResponse(stream(), media_type="text/event-stream") @app.get("/health") async def health(): return {"status": "ok", "model": "gpt-4o", "uptime": time.time() - start_time} ``` --- ## Cost Control ```python # Cost-aware model routing — use cheap models when possible from langchain_openai import ChatOpenAI MODELS = { "fast": ChatOpenAI(model="gpt-4o-mini", temperature=0), # $0.15/1M in "smart": ChatOpenAI(model="gpt-4o", temperature=0), # $2.50/1M in "reasoning": ChatOpenAI(model="o1", temperature=1), # $15/1M in } def select_model(task_type: str, input_length: int) -> str: """Route to cheapest model that can handle the task.""" if task_type == "classification" or input_length < 500: return "fast" if task_type in ("code_generation", "complex_reasoning"): return "reasoning" return "smart" # Budget enforcement class BudgetTracker: def __init__(self, daily_limit_usd: float = 10.0): self.daily_limit = daily_limit_usd self.spent_today = 0.0 self.last_reset = datetime.now().date() def check_budget(self, estimated_cost: float) -> bool: if datetime.now().date() > self.last_reset: self.spent_today = 0.0 self.last_reset = datetime.now().date() if self.spent_today + estimated_cost > self.daily_limit: raise BudgetExceededError(f"Daily budget ${self.daily_limit} exceeded") return True def record_spend(self, cost: float): self.spent_today += cost ``` ## aleph-cloud-self-deployment (v2.0.0) Category: web3 Description: Skill for AI agents to autonomously deploy themselves onto Aleph Cloud decentralized infrastructure. Covers VM provisioning, CRN discovery, agent framework installation, gateway configuration, identity transfer, and becoming fully operational on decentralized compute — without human intervention. Features: - Aleph Cloud account setup with credits or $ALEPH tokens - Programmatic CRN discovery and scoring - Non-interactive VM instance creation (bypasses TUI) - OpenClaw agent framework installation and configuration - Gateway auth with auth-profiles.json (not auth.json) - SSH keypair generation and secure key transfer - Recursive multi-instance deployment enablement - All-in-one deploy.sh and setup-agent.sh scripts - Cost planning across all 6 compute tiers - Comprehensive troubleshooting for real-world issues # Aleph Cloud Self-Deployment v2.0: Multi-Node Fleet Management Comprehensive framework for deploying and managing OpenClaw instances across Aleph Cloud infrastructure with advanced orchestration, auto-provisioning protocols, and enterprise-grade reliability. ## Table of Contents 1. [Infrastructure Planning & Architecture](#infrastructure-planning--architecture) 2. [Single Node Deployment Foundation](#single-node-deployment-foundation) 3. [Multi-Node Fleet Management](#multi-node-fleet-management) 4. [Auto-Provisioning Protocol (SRP)](#auto-provisioning-protocol-srp) 5. [Inter-VM Communication Networks](#inter-vm-communication-networks) 6. [Load Distribution & Orchestration](#load-distribution--orchestration) 7. [Disaster Recovery & Auto-Recreation](#disaster-recovery--auto-recreation) 8. [Cost Optimization Strategies](#cost-optimization-strategies) 9. [Security Hardening Framework](#security-hardening-framework) 10. [Monitoring & Maintenance](#monitoring--maintenance) --- ## Infrastructure Planning & Architecture ### Aleph Cloud Architecture Overview **Network Topology:** ``` ┌─────────────────────────────────────────────────────────┐ │ Aleph Cloud Network │ ├─────────────────┬─────────────────┬─────────────────────┤ │ Primary Node │ Worker Node 1 │ Worker Node 2 │ │ (Orchestrator)│ (Compute) │ (Compute) │ │ │ │ │ │ • Fleet Manager │ • OpenClaw │ • OpenClaw │ │ • Load Balancer │ • Tailscale │ • Tailscale │ │ • Backup Coord │ • Health Mon │ • Health Mon │ │ • SSH Gateway │ • Auto-Restart │ • Auto-Restart │ └─────────────────┴─────────────────┴─────────────────────┘ │ │ │ └─────────────────┼─────────────────┘ Tailscale Mesh Network SSH Tunnels ``` **Resource Planning Matrix:** ```yaml Node Types: Orchestrator (Primary): CRN: aleph.im Tier: 4 vCPU, 8GB RAM, 100GB SSD Role: Fleet management, load balancing, coordination Cost: ~50 ALEPH/month Compute Nodes (Workers): CRN: aleph.im, twentysix.cloud, cybernodes.io Tier: 2 vCPU, 4GB RAM, 50GB SSD Role: OpenClaw instances, task execution Cost: ~25 ALEPH/month each Backup Node (Optional): CRN: Different provider for redundancy Tier: 1 vCPU, 2GB RAM, 20GB SSD Role: Configuration backup, emergency recovery Cost: ~15 ALEPH/month Total Monthly Cost (5-node setup): ~165 ALEPH (~$50-80 USD) ``` ### CRN Selection Strategy **Provider Tier Assessment:** ```bash #!/bin/bash # CRN evaluation script evaluate_crn() { local crn_url=$1 local crn_name=$2 echo "=== Evaluating $crn_name ($crn_url) ===" # Performance test echo "Performance Test:" time curl -s "$crn_url/api/v0/messages" | head -10 # Availability check echo "Availability Check:" for i in {1..5}; do response=$(curl -s -w "%{http_code}" -o /dev/null "$crn_url/api/v0/messages") echo "Attempt $i: HTTP $response" sleep 2 done # Geographic latency echo "Latency Test:" ping -c 3 "${crn_url#https://}" | grep "time=" echo "------------------------" } # Test major CRNs evaluate_crn "https://api2.aleph.im" "Official Aleph.im" evaluate_crn "https://api.twentysix.cloud" "TwentySix Cloud" evaluate_crn "https://api.cybernodes.io" "CyberNodes" evaluate_crn "https://api.nft.storage" "NFT.Storage" # Generate recommendation echo "=== CRN RECOMMENDATIONS ===" echo "Primary (Orchestrator): aleph.im (highest reliability)" echo "Workers: Mix of twentysix.cloud + cybernodes.io (cost optimization)" echo "Backup: Different provider for redundancy" ``` --- ## Single Node Deployment Foundation ### Prerequisites & Setup **Local Environment Setup:** ```bash #!/bin/bash # setup-aleph-environment.sh set -e echo "🚀 Setting up Aleph Cloud deployment environment..." # Install aleph CLI if ! command -v aleph &> /dev/null; then echo "Installing Aleph CLI..." pip3 install aleph-client # Alternative: npm install -g aleph-js fi # Verify installation aleph --version # Create deployment directory structure mkdir -p ~/.aleph-deploy/{keys,configs,scripts,backups} # Generate SSH key pair for VMs if [[ ! -f ~/.aleph-deploy/keys/aleph_rsa ]]; then echo "Generating SSH key pair..." ssh-keygen -t rsa -b 4096 -f ~/.aleph-deploy/keys/aleph_rsa -N "" -C "aleph-fleet-$(date +%Y%m%d)" fi # Create aleph account configuration cat > ~/.aleph-deploy/configs/account.json << 'EOF' { "private_key": null, "address": null, "mnemonic": null, "created": null } EOF echo "✅ Environment setup complete!" echo "Next steps:" echo "1. Run: aleph account create" echo "2. Fund your account with ALEPH tokens" echo "3. Configure your deployment parameters" ``` **Account Creation & Funding:** ```bash #!/bin/bash # account-setup.sh echo "🔑 Setting up Aleph account..." # Create new account or import existing read -p "Do you want to (c)reate new account or (i)mport existing? " choice case $choice in c|C) echo "Creating new account..." aleph account create --replace ;; i|I) echo "Import your private key or mnemonic..." aleph account import-private-key ;; *) echo "Invalid choice" exit 1 ;; esac # Display account info echo "Account created/imported:" aleph account show # Check balance balance=$(aleph balance) echo "Current balance: $balance ALEPH" if (( $(echo "$balance < 100" | bc -l) )); then echo "⚠️ WARNING: Low balance. You need ~165 ALEPH for a 5-node deployment." echo "Fund your account at: https://aleph.im" echo "Your address: $(aleph account show | grep Address | cut -d: -f2 | xargs)" fi echo "✅ Account setup complete!" ``` ### Single VM Deployment **Basic VM Deployment Script:** ```bash #!/bin/bash # deploy-single-vm.sh set -e # Configuration VM_NAME="${1:-openclaw-primary}" CRN_URL="${2:-https://api2.aleph.im}" VM_TYPE="${3:-vm-standard-2}" DISK_SIZE="${4:-50}" echo "🚀 Deploying single VM: $VM_NAME" # Read SSH public key SSH_PUB_KEY=$(cat ~/.aleph-deploy/keys/aleph_rsa.pub) # Create VM deployment aleph instance create \ --name "$VM_NAME" \ --image-ref "ubuntu:22.04" \ --vcpus 2 \ --memory 4096 \ --disk-size "$DISK_SIZE" \ --ssh-authorized-keys "$SSH_PUB_KEY" \ --crn "$CRN_URL" \ --volumes '[{"name":"data","mount_path":"/data","size_gb":20,"persistence":true}]' \ --environment-variables '{ "OPENCLAW_VERSION":"latest", "NODE_ENV":"production", "DEPLOY_TYPE":"aleph-cloud" }' \ --setup-script "$(cat << 'SETUP_SCRIPT' #!/bin/bash set -e # Update system apt-get update && apt-get upgrade -y # Install essential packages apt-get install -y curl wget git htop unzip jq fail2ban ufw nodejs npm # Install Docker # Note: In production, verify checksums before running downloaded scripts curl -fsSL https://get.docker.com -o get-docker.sh sh get-docker.sh usermod -aG docker ubuntu # Install Docker Compose curl -L "https://github.com/docker/compose/releases/latest/download/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose chmod +x /usr/local/bin/docker-compose # Setup firewall ufw default deny incoming ufw default allow outgoing ufw allow ssh ufw allow 80 ufw allow 443 ufw --force enable # Install OpenClaw curl -fsSL https://raw.githubusercontent.com/openclaw/openclaw/main/install.sh | bash # Configure OpenClaw for production mkdir -p /opt/openclaw/config cat > /opt/openclaw/config/production.json << 'CONFIG' { "server": { "port": 3000, "host": "0.0.0.0", "cluster": true }, "logging": { "level": "info", "file": "/var/log/openclaw/app.log" }, "aleph": { "node_id": "$HOSTNAME", "deployment_type": "cloud", "auto_restart": true } } CONFIG # Create systemd service cat > /etc/systemd/system/openclaw.service << 'SERVICE' [Unit] Description=OpenClaw Service After=network.target [Service] Type=simple User=ubuntu WorkingDirectory=/opt/openclaw Environment=NODE_ENV=production ExecStart=/usr/bin/node server.js Restart=always RestartSec=10 StandardOutput=syslog StandardError=syslog SyslogIdentifier=openclaw [Install] WantedBy=multi-user.target SERVICE # Enable and start OpenClaw systemctl enable openclaw systemctl start openclaw # Install monitoring agent cat > /opt/monitor-node.sh << 'MONITOR' #!/bin/bash while true; do timestamp=$(date '+%Y-%m-%d %H:%M:%S') load=$(uptime | awk -F'load average:' '{print $2}') memory=$(free | grep Mem | awk '{printf "%.1f%%", $3/$2 * 100.0}') disk=$(df -h / | awk 'NR==2{printf "%s", $5}') echo "$timestamp - Load:$load Memory:$memory Disk:$disk" >> /var/log/node-stats.log # Health check OpenClaw if ! systemctl is-active --quiet openclaw; then echo "$timestamp - OpenClaw service down, restarting..." >> /var/log/node-stats.log systemctl restart openclaw fi sleep 60 done MONITOR chmod +x /opt/monitor-node.sh # Use systemd instead of nohup — nohup processes are unsupervised # and won't restart if they crash cat > /etc/systemd/system/node-monitor.service << 'MONITOR_SVC' [Unit] Description=Node health monitor After=openclaw.service [Service] Type=simple ExecStart=/opt/monitor-node.sh Restart=always RestartSec=10 [Install] WantedBy=multi-user.target MONITOR_SVC systemctl daemon-reload systemctl enable node-monitor systemctl start node-monitor echo "✅ VM setup complete!" SETUP_SCRIPT )" echo "✅ VM deployment initiated!" echo "Monitoring deployment status..." # Wait for deployment to complete aleph instance status "$VM_NAME" --wait # Get VM connection details VM_INFO=$(aleph instance get "$VM_NAME") VM_IP=$(echo "$VM_INFO" | jq -r '.networking.ipv4') echo "🎉 VM deployed successfully!" echo "SSH Connection: ssh -i ~/.aleph-deploy/keys/aleph_rsa ubuntu@$VM_IP" echo "OpenClaw URL: http://$VM_IP:3000" # Test connection echo "Testing SSH connection..." # Use accept-new instead of no — it accepts first connection but rejects changed host keys (MITM protection) ssh -i ~/.aleph-deploy/keys/aleph_rsa -o StrictHostKeyChecking=accept-new ubuntu@"$VM_IP" "echo 'SSH connection successful!'" ``` --- ## Multi-Node Fleet Management ### Fleet Deployment Orchestrator **Master Deployment Script:** ```bash #!/bin/bash # deploy-fleet.sh set -e # Fleet Configuration FLEET_NAME="${1:-openclaw-fleet}" NODE_COUNT="${2:-5}" PRIMARY_CRN="https://api2.aleph.im" WORKER_CRNS=("https://api.twentysix.cloud" "https://api.cybernodes.io" "https://api.nft.storage") echo "🚀 Deploying OpenClaw fleet: $FLEET_NAME with $NODE_COUNT nodes" # Fleet configuration cat > ~/.aleph-deploy/configs/fleet.json << EOF { "fleet_name": "$FLEET_NAME", "deployment_date": "$(date -Iseconds)", "node_count": $NODE_COUNT, "primary_node": null, "worker_nodes": [], "network": { "tailscale_key": null, "ssh_tunnel_port": 2222, "load_balancer_port": 8080 }, "replication": { "enabled": true, "sync_interval": 300, "backup_retention": 7 } } EOF deploy_primary_node() { echo "📊 Deploying Primary Node (Orchestrator)..." local node_name="${FLEET_NAME}-primary" local setup_script=$(cat << 'PRIMARY_SETUP' #!/bin/bash set -e # Standard VM setup apt-get update && apt-get upgrade -y apt-get install -y curl wget git htop jq fail2ban ufw nodejs npm docker.io docker-compose # Create a dedicated non-root user for fleet services # Running all services as root is a security risk — a compromise in any # service gives full system access. Use a dedicated user for fleet-manager. useradd -r -s /usr/sbin/nologin -d /opt/fleet-manager fleetmgr || true # Install fleet management tools mkdir -p /opt/fleet-manager cd /opt/fleet-manager # Fleet Manager Application cat > fleet-manager.js << 'FLEET_MANAGER' const express = require('express'); const { exec } = require('child_process'); const fs = require('fs'); const crypto = require('crypto'); const app = express(); app.use(express.json()); // API key auth middleware — fleet manager should NOT be open to the internet. // Bind to Tailscale IP or localhost, and require an API key for all requests. const FLEET_API_KEY = process.env.FLEET_API_KEY || crypto.randomBytes(32).toString('hex'); if (!process.env.FLEET_API_KEY) { console.log(`Generated FLEET_API_KEY: ${FLEET_API_KEY}`); console.log('Set FLEET_API_KEY env var to persist across restarts.'); } function requireAuth(req, res, next) { const key = req.headers['x-api-key'] || req.query.api_key; if (!key || key !== FLEET_API_KEY) { return res.status(401).json({ error: 'Unauthorized' }); } next(); } app.use(requireAuth); // Fleet status endpoint app.get('/fleet/status', (req, res) => { try { const data = fs.readFileSync('/opt/fleet-manager/nodes.json', 'utf8'); res.json(JSON.parse(data)); } catch (err) { if (err.code === 'ENOENT') { res.json({ nodes: [] }); } else { res.status(500).json({ error: err.message }); } } }); // Health check endpoint app.get('/health', (req, res) => { res.json({ status: 'healthy', timestamp: new Date().toISOString() }); }); // Node registration endpoint app.post('/fleet/register', (req, res) => { const { node_id, ip_address, capabilities } = req.body; let fleet; try { fleet = JSON.parse(fs.readFileSync('/opt/fleet-manager/nodes.json', 'utf8')); } catch { fleet = { nodes: [] }; } // Update or add node const existingIndex = fleet.nodes.findIndex(n => n.node_id === node_id); const nodeData = { node_id, ip_address, capabilities, last_seen: new Date().toISOString(), status: 'active' }; if (existingIndex >= 0) { fleet.nodes[existingIndex] = nodeData; } else { fleet.nodes.push(nodeData); } fs.writeFileSync('/opt/fleet-manager/nodes.json', JSON.stringify(fleet, null, 2)); res.json({ success: true }); }); // Load distribution endpoint app.get('/fleet/distribute/:task', (req, res) => { const task = req.params.task; let nodes; try { nodes = JSON.parse(fs.readFileSync('/opt/fleet-manager/nodes.json', 'utf8')); } catch { nodes = { nodes: [] }; } // Simple round-robin distribution const activeNodes = nodes.nodes.filter(n => n.status === 'active'); if (activeNodes.length === 0) { return res.status(503).json({ error: 'No active nodes available' }); } const assignedNode = activeNodes[Math.floor(Math.random() * activeNodes.length)]; res.json({ task, assigned_node: assignedNode.node_id, node_ip: assignedNode.ip_address }); }); const PORT = process.env.PORT || 8080; // Bind to localhost or Tailscale IP — do NOT expose fleet manager to the public internet const BIND_HOST = process.env.BIND_HOST || '127.0.0.1'; app.listen(PORT, BIND_HOST, () => { console.log(`Fleet Manager running on ${BIND_HOST}:${PORT}`); }); FLEET_MANAGER # Install dependencies and start fleet manager npm init -y npm install express chmod +x fleet-manager.js # Create systemd service cat > /etc/systemd/system/fleet-manager.service << 'SERVICE' [Unit] Description=OpenClaw Fleet Manager After=network.target [Service] Type=simple User=fleetmgr WorkingDirectory=/opt/fleet-manager ExecStart=/usr/bin/node fleet-manager.js Restart=always RestartSec=10 Environment=PORT=8080 [Install] WantedBy=multi-user.target SERVICE # Set ownership so fleetmgr user can read/write chown -R fleetmgr:fleetmgr /opt/fleet-manager # Initialize nodes registry BEFORE starting fleet-manager. # fleet-manager.js reads this file on startup — if it doesn't exist, # the readFileSync call will throw ENOENT and crash the service. echo '{"nodes": []}' > /opt/fleet-manager/nodes.json chown fleetmgr:fleetmgr /opt/fleet-manager/nodes.json systemctl enable fleet-manager systemctl start fleet-manager # Install OpenClaw curl -fsSL https://raw.githubusercontent.com/openclaw/openclaw/main/install.sh | bash # Configure as primary node mkdir -p /opt/openclaw/config cat > /opt/openclaw/config/primary.json << 'CONFIG' { "role": "primary", "fleet_manager": "http://localhost:8080", "node_discovery": true, "load_balancing": true } CONFIG echo "✅ Primary node setup complete!" PRIMARY_SETUP ) aleph instance create \ --name "$node_name" \ --image-ref "ubuntu:22.04" \ --vcpus 4 \ --memory 8192 \ --disk-size 100 \ --ssh-authorized-keys "$(cat ~/.aleph-deploy/keys/aleph_rsa.pub)" \ --crn "$PRIMARY_CRN" \ --setup-script "$setup_script" # Wait for deployment and get IP aleph instance status "$node_name" --wait local primary_ip=$(aleph instance get "$node_name" | jq -r '.networking.ipv4') # Update fleet config # Use mktemp to avoid race conditions with predictable tmp.json filenames local tmpfile=$(mktemp) jq '.primary_node = {"name": "'$node_name'", "ip": "'$primary_ip'"}' ~/.aleph-deploy/configs/fleet.json > "$tmpfile" mv "$tmpfile" ~/.aleph-deploy/configs/fleet.json echo "✅ Primary node deployed: $primary_ip" return 0 } deploy_worker_node() { local node_id=$1 local crn_url=$2 local primary_ip=$3 local node_name="${FLEET_NAME}-worker-${node_id}" echo "👷 Deploying Worker Node $node_id..." local setup_script=$(cat << WORKER_SETUP #!/bin/bash set -e # Standard setup apt-get update && apt-get upgrade -y apt-get install -y curl wget git htop jq nodejs npm docker.io # Install OpenClaw curl -fsSL https://raw.githubusercontent.com/openclaw/openclaw/main/install.sh | bash # Configure as worker node mkdir -p /opt/openclaw/config cat > /opt/openclaw/config/worker.json << 'CONFIG' { "role": "worker", "primary_node": "$primary_ip", "node_id": "$node_name", "auto_register": true, "heartbeat_interval": 30 } CONFIG # Worker registration script cat > /opt/register-worker.sh << 'REGISTER' #!/bin/bash NODE_ID="$node_name" PRIMARY_IP="$primary_ip" LOCAL_IP=\$(curl -s http://checkip.amazonaws.com || hostname -I | awk '{print \$1}') curl -X POST http://\$PRIMARY_IP:8080/fleet/register \ -H "Content-Type: application/json" \ -H "x-api-key: \$FLEET_API_KEY" \ -d "{ \"node_id\": \"\$NODE_ID\", \"ip_address\": \"\$LOCAL_IP\", \"capabilities\": [\"compute\", \"storage\", \"openclaw\"] }" REGISTER chmod +x /opt/register-worker.sh # Register with primary node sleep 30 /opt/register-worker.sh # Setup heartbeat cat > /opt/heartbeat.sh << 'HEARTBEAT' #!/bin/bash while true; do /opt/register-worker.sh sleep 30 done HEARTBEAT chmod +x /opt/heartbeat.sh # Use systemd instead of nohup for supervised process management cat > /etc/systemd/system/heartbeat.service << 'HB_SVC' [Unit] Description=Worker node heartbeat After=network-online.target [Service] Type=simple ExecStart=/opt/heartbeat.sh Restart=always RestartSec=10 [Install] WantedBy=multi-user.target HB_SVC systemctl daemon-reload systemctl enable heartbeat systemctl start heartbeat echo "✅ Worker node $node_id setup complete!" WORKER_SETUP ) aleph instance create \ --name "$node_name" \ --image-ref "ubuntu:22.04" \ --vcpus 2 \ --memory 4096 \ --disk-size 50 \ --ssh-authorized-keys "$(cat ~/.aleph-deploy/keys/aleph_rsa.pub)" \ --crn "$crn_url" \ --setup-script "$setup_script" # Update fleet config local worker_info='{"name": "'$node_name'", "id": '$node_id', "crn": "'$crn_url'"}' local tmpfile=$(mktemp) jq '.worker_nodes += ['$worker_info']' ~/.aleph-deploy/configs/fleet.json > "$tmpfile" mv "$tmpfile" ~/.aleph-deploy/configs/fleet.json echo "✅ Worker node $node_id deployed on $crn_url" } # Main deployment sequence echo "📋 Starting fleet deployment sequence..." # Deploy primary node first deploy_primary_node primary_ip=$(jq -r '.primary_node.ip' ~/.aleph-deploy/configs/fleet.json) # Wait for primary node to be ready echo "⏳ Waiting for primary node to initialize..." sleep 60 # Deploy worker nodes for i in $(seq 1 $((NODE_COUNT-1))); do crn_index=$((($i - 1) % ${#WORKER_CRNS[@]})) crn_url=${WORKER_CRNS[$crn_index]} deploy_worker_node "$i" "$crn_url" "$primary_ip" & # Stagger deployments to avoid overwhelming CRNs sleep 30 done # Wait for all deployments to complete wait echo "🎉 Fleet deployment complete!" echo "Primary Node: http://$primary_ip:8080" echo "Fleet Status: curl http://$primary_ip:8080/fleet/status" # Display fleet summary cat ~/.aleph-deploy/configs/fleet.json | jq . ``` ### Fleet Management Commands **Fleet Control Script:** ```bash #!/bin/bash # fleet-control.sh FLEET_CONFIG="$HOME/.aleph-deploy/configs/fleet.json" # All fleet manager endpoints require x-api-key authentication. # Set FLEET_API_KEY in your environment or .env file. FLEET_API_KEY="${FLEET_API_KEY:?FLEET_API_KEY env var is required}" get_primary_ip() { jq -r '.primary_node.ip' "$FLEET_CONFIG" } fleet_status() { local primary_ip=$(get_primary_ip) echo "🔍 Fleet Status Check..." curl -s -H "x-api-key: $FLEET_API_KEY" "http://$primary_ip:8080/fleet/status" | jq '.' || { echo "❌ Unable to reach fleet manager" return 1 } } fleet_health() { echo "🏥 Fleet Health Check..." local primary_ip=$(get_primary_ip) local nodes=$(curl -s -H "x-api-key: $FLEET_API_KEY" "http://$primary_ip:8080/fleet/status" | jq -r '.nodes[].ip_address') for node_ip in $nodes; do echo "Checking node: $node_ip" if ssh -i ~/.aleph-deploy/keys/aleph_rsa -o ConnectTimeout=5 ubuntu@"$node_ip" "systemctl is-active openclaw" &>/dev/null; then echo " ✅ $node_ip - OpenClaw running" else echo " ❌ $node_ip - OpenClaw not responding" fi done } fleet_restart() { local service_name=$1 # Validate service_name to prevent command injection via SSH if [[ ! "$service_name" =~ ^[a-zA-Z0-9_-]+$ ]]; then echo "❌ Invalid service name: $service_name" return 1 fi echo "🔄 Restarting $service_name on all nodes..." local primary_ip=$(get_primary_ip) local nodes=$(curl -s -H "x-api-key: $FLEET_API_KEY" "http://$primary_ip:8080/fleet/status" | jq -r '.nodes[].ip_address') for node_ip in $nodes; do echo "Restarting $service_name on $node_ip..." ssh -i ~/.aleph-deploy/keys/aleph_rsa ubuntu@"$node_ip" "sudo systemctl restart $service_name" done } fleet_deploy() { local script_path=$1 echo "📤 Deploying script to all nodes: $script_path" if [[ ! -f "$script_path" ]]; then echo "❌ Script file not found: $script_path" return 1 fi local primary_ip=$(get_primary_ip) local nodes=$(curl -s -H "x-api-key: $FLEET_API_KEY" "http://$primary_ip:8080/fleet/status" | jq -r '.nodes[].ip_address') for node_ip in $nodes; do echo "Deploying to $node_ip..." scp -i ~/.aleph-deploy/keys/aleph_rsa "$script_path" ubuntu@"$node_ip":/tmp/deploy-script.sh ssh -i ~/.aleph-deploy/keys/aleph_rsa ubuntu@"$node_ip" "chmod +x /tmp/deploy-script.sh && sudo /tmp/deploy-script.sh" done } fleet_scale() { local target_nodes=$1 local current_nodes=$(jq '.node_count' "$FLEET_CONFIG") echo "📊 Scaling fleet from $current_nodes to $target_nodes nodes..." if (( target_nodes > current_nodes )); then echo "🔺 Scaling up: adding $((target_nodes - current_nodes)) nodes" # Add scale-up logic elif (( target_nodes < current_nodes )); then echo "🔻 Scaling down: removing $((current_nodes - target_nodes)) nodes" # Add scale-down logic else echo "✅ Fleet already at target size" fi } fleet_logs() { local service_name="${1:-openclaw}" local lines="${2:-50}" # Validate inputs to prevent command injection via SSH if [[ ! "$service_name" =~ ^[a-zA-Z0-9_-]+$ ]]; then echo "❌ Invalid service name: $service_name" return 1 fi if [[ ! "$lines" =~ ^[0-9]+$ ]]; then echo "❌ Invalid line count: $lines" return 1 fi echo "📋 Collecting logs from all nodes..." local primary_ip=$(get_primary_ip) local nodes=$(curl -s -H "x-api-key: $FLEET_API_KEY" "http://$primary_ip:8080/fleet/status" | jq -r '.nodes[].ip_address') for node_ip in $nodes; do echo "=== Logs from $node_ip ===" ssh -i ~/.aleph-deploy/keys/aleph_rsa ubuntu@"$node_ip" "sudo journalctl -u $service_name -n $lines --no-pager" echo "" done } # Command dispatcher case "${1:-status}" in "status") fleet_status ;; "health") fleet_health ;; "restart") fleet_restart "${2:-openclaw}" ;; "deploy") fleet_deploy "$2" ;; "scale") fleet_scale "$2" ;; "logs") fleet_logs "$2" "$3" ;; *) echo "Usage: $0 {status|health|restart|deploy|scale|logs}" echo "" echo "Commands:" echo " status - Show fleet status" echo " health - Check health of all nodes" echo " restart [svc] - Restart service on all nodes" echo " deploy