Files
HOA_Financial_Platform/backend/src/modules/investment-planning/investment-planning.service.ts
olsch01 9146118df1 feat: async AI calls, 10-min timeout, and failure messaging
- Make all AI endpoints (health scores + investment recommendations)
  fire-and-forget: POST returns immediately, frontend polls for results
- Extend AI API timeout from 2-5 min to 10 min for both services
- Add "last analysis failed — showing cached data" message to the
  Investment Recommendations panel (matches health score widgets)
- Add status/error_message columns to ai_recommendations table
- Remove nginx AI timeout overrides (no longer needed)
- Users can now navigate away during AI processing without interruption

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 14:42:53 -05:00

1210 lines
48 KiB
TypeScript

import { Injectable, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { TenantService } from '../../database/tenant.service';
import { DataSource } from 'typeorm';
// ── Interfaces ──
export interface AccountBalance {
id: string;
account_number: string;
name: string;
account_type: string;
fund_type: string;
interest_rate: string | null;
balance: string;
}
export interface InvestmentAccount {
id: string;
name: string;
institution: string;
investment_type: string;
fund_type: string;
principal: string;
interest_rate: string;
maturity_date: string | null;
purchase_date: string | null;
current_value: string;
}
export interface MarketRate {
bank_name: string;
apy: string;
min_deposit: string | null;
term: string;
term_months: number | null;
rate_type: string;
fetched_at: string;
}
export interface Recommendation {
type: 'cd_ladder' | 'new_investment' | 'reallocation' | 'maturity_action' | 'liquidity_warning' | 'general';
priority: 'high' | 'medium' | 'low';
title: string;
summary: string;
details: string;
fund_type: 'operating' | 'reserve' | 'both';
suggested_amount?: number;
suggested_term?: string;
suggested_rate?: number;
bank_name?: string;
rationale: string;
}
export interface AIResponse {
recommendations: Recommendation[];
overall_assessment: string;
risk_notes: string[];
}
export interface SavedRecommendation {
id: string;
recommendations: Recommendation[];
overall_assessment: string;
risk_notes: string[];
response_time_ms: number;
created_at: string;
status: 'processing' | 'complete' | 'error';
last_failed: boolean;
error_message?: string;
}
@Injectable()
export class InvestmentPlanningService {
private readonly logger = new Logger(InvestmentPlanningService.name);
private debugEnabled: boolean;
constructor(
private tenant: TenantService,
private configService: ConfigService,
private dataSource: DataSource,
) {
// Toggle with AI_DEBUG=true in .env for detailed prompt/response logging
this.debugEnabled = this.configService.get<string>('AI_DEBUG') === 'true';
if (this.debugEnabled) {
this.logger.warn('AI DEBUG MODE ENABLED — prompts and responses will be logged');
}
}
private debug(label: string, data: any) {
if (!this.debugEnabled) return;
const text = typeof data === 'string' ? data : JSON.stringify(data, null, 2);
// Truncate very long output to keep logs manageable
const truncated = text.length > 5000 ? text.slice(0, 5000) + `\n... [truncated, ${text.length} total chars]` : text;
this.logger.log(`[AI_DEBUG] ${label}:\n${truncated}`);
}
// ── Public API Methods ──
/**
* Build a comprehensive financial snapshot for the investment planning page.
* All financial data is tenant-scoped via TenantService.
*/
async getFinancialSnapshot() {
const [
accountBalances,
investmentAccounts,
budgets,
projects,
cashFlowContext,
] = await Promise.all([
this.getAccountBalances(),
this.getInvestmentAccounts(),
this.getBudgets(),
this.getProjects(),
this.getCashFlowContext(),
]);
// Compute summary totals
const operatingCash = accountBalances
.filter((a) => a.fund_type === 'operating' && a.account_type === 'asset')
.reduce((sum, a) => sum + parseFloat(a.balance || '0'), 0);
const reserveCash = accountBalances
.filter((a) => a.fund_type === 'reserve' && a.account_type === 'asset')
.reduce((sum, a) => sum + parseFloat(a.balance || '0'), 0);
const operatingInvestments = investmentAccounts
.filter((i) => i.fund_type === 'operating')
.reduce((sum, i) => sum + parseFloat(i.current_value || i.principal || '0'), 0);
const reserveInvestments = investmentAccounts
.filter((i) => i.fund_type === 'reserve')
.reduce((sum, i) => sum + parseFloat(i.current_value || i.principal || '0'), 0);
return {
summary: {
operating_cash: operatingCash,
reserve_cash: reserveCash,
operating_investments: operatingInvestments,
reserve_investments: reserveInvestments,
total_operating: operatingCash + operatingInvestments,
total_reserve: reserveCash + reserveInvestments,
total_all: operatingCash + reserveCash + operatingInvestments + reserveInvestments,
},
account_balances: accountBalances,
investment_accounts: investmentAccounts,
budgets,
projects,
cash_flow_context: cashFlowContext,
};
}
/**
* Fetch latest market rates from the shared schema (cross-tenant market data).
* Returns rates grouped by type, each showing only the most recent fetch batch.
*/
async getMarketRates(): Promise<{ cd: MarketRate[]; money_market: MarketRate[]; high_yield_savings: MarketRate[] }> {
const queryRunner = this.dataSource.createQueryRunner();
try {
await queryRunner.connect();
// For each rate type, get the latest batch (same fetched_at timestamp)
const fetchLatest = async (rateType: string): Promise<MarketRate[]> => {
return queryRunner.query(
`SELECT bank_name, apy, min_deposit, term, term_months, rate_type, fetched_at
FROM shared.cd_rates
WHERE rate_type = $1
AND fetched_at = (
SELECT MAX(fetched_at) FROM shared.cd_rates WHERE rate_type = $1
)
ORDER BY apy DESC
LIMIT 25`,
[rateType],
);
};
const [cd, moneyMarket, highYieldSavings] = await Promise.all([
fetchLatest('cd'),
fetchLatest('money_market'),
fetchLatest('high_yield_savings'),
]);
return {
cd,
money_market: moneyMarket,
high_yield_savings: highYieldSavings,
};
} finally {
await queryRunner.release();
}
}
/**
* Backward-compatible: get only CD rates.
*/
async getCdRates(): Promise<MarketRate[]> {
const rates = await this.getMarketRates();
return rates.cd;
}
/**
* Ensure the status/error_message columns exist (for tenants created before this migration).
*/
private async ensureStatusColumn(): Promise<void> {
try {
await this.tenant.query(
`ALTER TABLE ai_recommendations ADD COLUMN IF NOT EXISTS status VARCHAR(20) DEFAULT 'complete'`,
);
await this.tenant.query(
`ALTER TABLE ai_recommendations ADD COLUMN IF NOT EXISTS error_message TEXT`,
);
} catch {
// Ignore — column may already exist or table may not exist
}
}
/**
* Get the latest saved AI recommendation for this tenant.
* Returns status and last_failed flag for UI state management.
*/
async getSavedRecommendation(): Promise<SavedRecommendation | null> {
try {
await this.ensureStatusColumn();
const rows = await this.tenant.query(
`SELECT id, recommendations_json, overall_assessment, risk_notes,
response_time_ms, status, error_message, created_at
FROM ai_recommendations
ORDER BY created_at DESC
LIMIT 1`,
);
if (!rows || rows.length === 0) return null;
const row = rows[0];
const status = row.status || 'complete';
// If still processing, return processing status
if (status === 'processing') {
return {
id: row.id,
recommendations: [],
overall_assessment: '',
risk_notes: [],
response_time_ms: 0,
created_at: row.created_at,
status: 'processing',
last_failed: false,
};
}
// If latest attempt failed, return the last successful result with last_failed flag
if (status === 'error') {
const lastGood = await this.tenant.query(
`SELECT id, recommendations_json, overall_assessment, risk_notes,
response_time_ms, created_at
FROM ai_recommendations
WHERE status = 'complete'
ORDER BY created_at DESC
LIMIT 1`,
);
if (lastGood?.length) {
const goodRow = lastGood[0];
const recData = goodRow.recommendations_json || {};
return {
id: goodRow.id,
recommendations: recData.recommendations || [],
overall_assessment: goodRow.overall_assessment || recData.overall_assessment || '',
risk_notes: goodRow.risk_notes || recData.risk_notes || [],
response_time_ms: goodRow.response_time_ms || 0,
created_at: goodRow.created_at,
status: 'complete',
last_failed: true,
error_message: row.error_message,
};
}
// No previous good result — return error state
return {
id: row.id,
recommendations: [],
overall_assessment: row.error_message || 'AI analysis failed. Please try again.',
risk_notes: [],
response_time_ms: 0,
created_at: row.created_at,
status: 'error',
last_failed: true,
error_message: row.error_message,
};
}
// Complete — return the data normally
const recData = row.recommendations_json || {};
return {
id: row.id,
recommendations: recData.recommendations || [],
overall_assessment: row.overall_assessment || recData.overall_assessment || '',
risk_notes: row.risk_notes || recData.risk_notes || [],
response_time_ms: row.response_time_ms || 0,
created_at: row.created_at,
status: 'complete',
last_failed: false,
};
} catch (err: any) {
// Table might not exist yet (pre-migration tenants)
this.logger.warn(`Could not load saved recommendations: ${err.message}`);
return null;
}
}
/**
* Save a 'processing' placeholder record and return its ID.
*/
private async saveProcessingRecord(userId?: string): Promise<string> {
await this.ensureStatusColumn();
const rows = await this.tenant.query(
`INSERT INTO ai_recommendations
(recommendations_json, overall_assessment, risk_notes, requested_by, status)
VALUES ('{}', '', '[]', $1, 'processing')
RETURNING id`,
[userId || null],
);
return rows[0].id;
}
/**
* Update a processing record with completed results.
*/
private async updateRecommendationComplete(
jobId: string,
aiResponse: AIResponse,
userId: string | undefined,
elapsed: number,
): Promise<void> {
try {
await this.tenant.query(
`UPDATE ai_recommendations
SET recommendations_json = $1,
overall_assessment = $2,
risk_notes = $3,
response_time_ms = $4,
status = 'complete'
WHERE id = $5`,
[
JSON.stringify(aiResponse),
aiResponse.overall_assessment || '',
JSON.stringify(aiResponse.risk_notes || []),
elapsed,
jobId,
],
);
} catch (err: any) {
this.logger.warn(`Could not update recommendation ${jobId}: ${err.message}`);
}
}
/**
* Update a processing record with error status.
*/
private async updateRecommendationError(jobId: string, errorMessage: string): Promise<void> {
try {
await this.tenant.query(
`UPDATE ai_recommendations
SET status = 'error',
error_message = $1
WHERE id = $2`,
[errorMessage, jobId],
);
} catch (err: any) {
this.logger.warn(`Could not update recommendation error ${jobId}: ${err.message}`);
}
}
/**
* Trigger AI recommendations asynchronously.
* Saves a 'processing' record, starts the AI work in the background, and returns immediately.
* The TenantService instance remains alive via closure reference for the duration of the background work.
*/
async triggerAIRecommendations(userId?: string, orgId?: string): Promise<{ status: string; message: string }> {
const jobId = await this.saveProcessingRecord(userId);
this.logger.log(`AI recommendation triggered (job ${jobId}), starting background processing...`);
// Fire-and-forget — the Promise keeps this service instance (and TenantService) alive
this.runBackgroundRecommendations(jobId, userId, orgId).catch((err) => {
this.logger.error(`Background AI recommendation failed (job ${jobId}): ${err.message}`);
});
return {
status: 'processing',
message: 'AI analysis has been started. You can navigate away safely — results will appear when ready.',
};
}
/**
* Run the full AI recommendation pipeline in the background.
*/
private async runBackgroundRecommendations(jobId: string, userId?: string, orgId?: string): Promise<void> {
try {
const startTime = Date.now();
const [snapshot, allRates, monthlyForecast] = await Promise.all([
this.getFinancialSnapshot(),
this.getMarketRates(),
this.getMonthlyForecast(),
]);
this.debug('background_snapshot_summary', {
job_id: jobId,
operating_cash: snapshot.summary.operating_cash,
reserve_cash: snapshot.summary.reserve_cash,
total_all: snapshot.summary.total_all,
investment_accounts: snapshot.investment_accounts.length,
});
const messages = this.buildPromptMessages(snapshot, allRates, monthlyForecast);
const aiResponse = await this.callAI(messages);
const elapsed = Date.now() - startTime;
this.debug('background_final_response', {
job_id: jobId,
recommendation_count: aiResponse.recommendations.length,
has_assessment: !!aiResponse.overall_assessment,
elapsed_ms: elapsed,
});
// Check if the AI returned a graceful error (empty recommendations with error message)
const isGracefulError = aiResponse.recommendations.length === 0 &&
(aiResponse.overall_assessment?.includes('Unable to generate') ||
aiResponse.overall_assessment?.includes('invalid response'));
if (isGracefulError) {
await this.updateRecommendationError(jobId, aiResponse.overall_assessment);
} else {
await this.updateRecommendationComplete(jobId, aiResponse, userId, elapsed);
}
// Log AI usage (fire-and-forget)
this.logAIUsage(userId, orgId, aiResponse, elapsed).catch(() => {});
this.logger.log(`Background AI recommendation completed (job ${jobId}) in ${elapsed}ms`);
} catch (err: any) {
this.logger.error(`Background AI recommendation error (job ${jobId}): ${err.message}`);
await this.updateRecommendationError(jobId, err.message);
}
}
/**
* Save AI recommendation result to tenant schema.
* @deprecated Use triggerAIRecommendations() for async flow instead
*/
private async saveRecommendation(aiResponse: AIResponse, userId: string | undefined, elapsed: number): Promise<void> {
try {
await this.ensureStatusColumn();
await this.tenant.query(
`INSERT INTO ai_recommendations
(recommendations_json, overall_assessment, risk_notes, requested_by, response_time_ms, status)
VALUES ($1, $2, $3, $4, $5, 'complete')`,
[
JSON.stringify(aiResponse),
aiResponse.overall_assessment || '',
JSON.stringify(aiResponse.risk_notes || []),
userId || null,
elapsed,
],
);
} catch (err: any) {
// Non-critical — don't let storage failure break recommendations
this.logger.warn(`Could not save recommendation: ${err.message}`);
}
}
/**
* Orchestrate the AI recommendation flow:
* 1. Gather all financial data (tenant-scoped)
* 2. Fetch all market rates (shared schema)
* 3. Build the prompt with all context
* 4. Call the AI API
* 5. Parse and return structured recommendations
* 6. Save to tenant storage for future retrieval
*/
async getAIRecommendations(userId?: string, orgId?: string): Promise<AIResponse> {
this.debug('getAIRecommendations', 'Starting AI recommendation flow');
const startTime = Date.now();
const [snapshot, allRates, monthlyForecast] = await Promise.all([
this.getFinancialSnapshot(),
this.getMarketRates(),
this.getMonthlyForecast(),
]);
this.debug('snapshot_summary', {
operating_cash: snapshot.summary.operating_cash,
reserve_cash: snapshot.summary.reserve_cash,
total_all: snapshot.summary.total_all,
investment_accounts: snapshot.investment_accounts.length,
budgets: snapshot.budgets.length,
projects: snapshot.projects.length,
cd_rates: allRates.cd.length,
money_market_rates: allRates.money_market.length,
savings_rates: allRates.high_yield_savings.length,
forecast_months: monthlyForecast.datapoints.length,
});
const messages = this.buildPromptMessages(snapshot, allRates, monthlyForecast);
const aiResponse = await this.callAI(messages);
const elapsed = Date.now() - startTime;
this.debug('final_response', {
recommendation_count: aiResponse.recommendations.length,
has_assessment: !!aiResponse.overall_assessment,
risk_notes_count: aiResponse.risk_notes?.length || 0,
});
// Save recommendation to tenant storage (fire-and-forget)
this.saveRecommendation(aiResponse, userId, elapsed).catch(() => {});
// Log AI usage to shared.ai_recommendation_log (fire-and-forget)
this.logAIUsage(userId, orgId, aiResponse, elapsed).catch(() => {});
return aiResponse;
}
private async logAIUsage(userId: string | undefined, orgId: string | undefined, response: AIResponse, elapsed: number) {
try {
const schema = this.tenant.getSchema();
await this.dataSource.query(
`INSERT INTO shared.ai_recommendation_log
(tenant_schema, organization_id, user_id, recommendation_count, response_time_ms, status)
VALUES ($1, $2, $3, $4, $5, $6)`,
[
schema || null,
orgId || null,
userId || null,
response.recommendations?.length || 0,
elapsed,
response.recommendations?.length > 0 ? 'success' : 'empty',
],
);
} catch (err) {
// Non-critical — don't let logging failure break recommendations
}
}
// ── Private: Tenant-Scoped Data Queries ──
private async getAccountBalances(): Promise<AccountBalance[]> {
return this.tenant.query(`
SELECT
a.id, a.account_number, a.name, a.account_type, a.fund_type,
a.interest_rate,
CASE
WHEN a.account_type IN ('asset', 'expense')
THEN COALESCE(SUM(jel.debit), 0) - COALESCE(SUM(jel.credit), 0)
ELSE COALESCE(SUM(jel.credit), 0) - COALESCE(SUM(jel.debit), 0)
END as balance
FROM accounts a
LEFT JOIN journal_entry_lines jel ON jel.account_id = a.id
LEFT JOIN journal_entries je ON je.id = jel.journal_entry_id
AND je.is_posted = true AND je.is_void = false
WHERE a.is_active = true
AND a.account_type IN ('asset', 'liability', 'equity')
GROUP BY a.id, a.account_number, a.name, a.account_type, a.fund_type, a.interest_rate
ORDER BY a.account_number
`);
}
private async getInvestmentAccounts(): Promise<InvestmentAccount[]> {
return this.tenant.query(`
SELECT
id, name, institution, investment_type, fund_type,
principal, interest_rate, maturity_date, purchase_date, current_value
FROM investment_accounts
WHERE is_active = true
ORDER BY maturity_date NULLS LAST
`);
}
private async getBudgets() {
const year = new Date().getFullYear();
return this.tenant.query(
`SELECT
b.fund_type, a.account_type, a.name, a.account_number,
(b.jan + b.feb + b.mar + b.apr + b.may + b.jun +
b.jul + b.aug + b.sep + b.oct + b.nov + b.dec_amt) as annual_total
FROM budgets b
JOIN accounts a ON a.id = b.account_id
WHERE b.fiscal_year = $1
ORDER BY a.account_type, a.account_number`,
[year],
);
}
private async getProjects() {
return this.tenant.query(`
SELECT
name, estimated_cost, target_year, target_month, fund_source,
status, priority, current_fund_balance, funded_percentage
FROM projects
WHERE is_active = true
AND status IN ('planned', 'approved', 'in_progress')
ORDER BY target_year, target_month NULLS LAST, priority
`);
}
private async getCashFlowContext() {
const year = new Date().getFullYear();
// Current operating cash position
const opCashResult = await this.tenant.query(`
SELECT COALESCE(SUM(sub.bal), 0) as total FROM (
SELECT COALESCE(SUM(jel.debit), 0) - COALESCE(SUM(jel.credit), 0) as bal
FROM accounts a
JOIN journal_entry_lines jel ON jel.account_id = a.id
JOIN journal_entries je ON je.id = jel.journal_entry_id
AND je.is_posted = true AND je.is_void = false
WHERE a.account_type = 'asset' AND a.fund_type = 'operating' AND a.is_active = true
GROUP BY a.id
) sub
`);
// Current reserve cash position
const resCashResult = await this.tenant.query(`
SELECT COALESCE(SUM(sub.bal), 0) as total FROM (
SELECT COALESCE(SUM(jel.debit), 0) - COALESCE(SUM(jel.credit), 0) as bal
FROM accounts a
JOIN journal_entry_lines jel ON jel.account_id = a.id
JOIN journal_entries je ON je.id = jel.journal_entry_id
AND je.is_posted = true AND je.is_void = false
WHERE a.account_type = 'asset' AND a.fund_type = 'reserve' AND a.is_active = true
GROUP BY a.id
) sub
`);
// Annual budget summary by fund_type and account_type
const budgetSummary = await this.tenant.query(
`SELECT
b.fund_type, a.account_type,
SUM(b.jan + b.feb + b.mar + b.apr + b.may + b.jun +
b.jul + b.aug + b.sep + b.oct + b.nov + b.dec_amt) as annual_total
FROM budgets b
JOIN accounts a ON a.id = b.account_id
WHERE b.fiscal_year = $1
GROUP BY b.fund_type, a.account_type`,
[year],
);
// Assessment income (monthly recurring revenue)
const assessmentIncome = await this.tenant.query(`
SELECT
COALESCE(SUM(ag.regular_assessment * (SELECT COUNT(*) FROM units u WHERE u.assessment_group_id = ag.id AND u.status = 'active')), 0) as monthly_assessment_income
FROM assessment_groups ag
WHERE ag.is_active = true
`);
return {
current_operating_cash: parseFloat(opCashResult[0]?.total || '0'),
current_reserve_cash: parseFloat(resCashResult[0]?.total || '0'),
budget_summary: budgetSummary,
monthly_assessment_income: parseFloat(assessmentIncome[0]?.monthly_assessment_income || '0'),
};
}
/**
* Build a 12-month forward cash flow forecast for the AI.
*/
private async getMonthlyForecast() {
const now = new Date();
const currentYear = now.getFullYear();
const currentMonth = now.getMonth() + 1;
const monthNames = ['jan','feb','mar','apr','may','jun','jul','aug','sep','oct','nov','dec_amt'];
const monthLabels = ['Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec'];
const forecastMonths = 12;
// ── 1) Current cash positions ──
const [opCashRows, resCashRows, opInvRows, resInvRows] = await Promise.all([
this.tenant.query(`
SELECT COALESCE(SUM(sub.bal), 0) as total FROM (
SELECT COALESCE(SUM(jel.debit), 0) - COALESCE(SUM(jel.credit), 0) as bal
FROM accounts a
JOIN journal_entry_lines jel ON jel.account_id = a.id
JOIN journal_entries je ON je.id = jel.journal_entry_id AND je.is_posted = true AND je.is_void = false
WHERE a.account_type = 'asset' AND a.fund_type = 'operating' AND a.is_active = true
GROUP BY a.id
) sub
`),
this.tenant.query(`
SELECT COALESCE(SUM(sub.bal), 0) as total FROM (
SELECT COALESCE(SUM(jel.debit), 0) - COALESCE(SUM(jel.credit), 0) as bal
FROM accounts a
JOIN journal_entry_lines jel ON jel.account_id = a.id
JOIN journal_entries je ON je.id = jel.journal_entry_id AND je.is_posted = true AND je.is_void = false
WHERE a.account_type = 'asset' AND a.fund_type = 'reserve' AND a.is_active = true
GROUP BY a.id
) sub
`),
this.tenant.query(`
SELECT COALESCE(SUM(current_value), 0) as total
FROM investment_accounts WHERE fund_type = 'operating' AND is_active = true
`),
this.tenant.query(`
SELECT COALESCE(SUM(current_value), 0) as total
FROM investment_accounts WHERE fund_type = 'reserve' AND is_active = true
`),
]);
let runOpCash = parseFloat(opCashRows[0]?.total || '0');
let runResCash = parseFloat(resCashRows[0]?.total || '0');
let runOpInv = parseFloat(opInvRows[0]?.total || '0');
let runResInv = parseFloat(resInvRows[0]?.total || '0');
// ── 2) Assessment income schedule (regular + special assessments) ──
const assessmentGroups = await this.tenant.query(`
SELECT ag.frequency, ag.regular_assessment, ag.special_assessment,
(SELECT COUNT(*) FROM units u WHERE u.assessment_group_id = ag.id AND u.status = 'active') as unit_count
FROM assessment_groups ag WHERE ag.is_active = true
`);
const getAssessmentIncome = (month: number): { operating: number; reserve: number } => {
let operating = 0;
let reserve = 0;
for (const g of assessmentGroups) {
const units = parseInt(g.unit_count) || 0;
const regular = parseFloat(g.regular_assessment) || 0;
const special = parseFloat(g.special_assessment) || 0;
const freq = g.frequency || 'monthly';
let applies = false;
if (freq === 'monthly') applies = true;
else if (freq === 'quarterly') applies = [1,4,7,10].includes(month);
else if (freq === 'annual') applies = month === 1;
if (applies) {
operating += regular * units;
reserve += special * units;
}
}
return { operating, reserve };
};
// ── 3) Monthly budget data (income & expenses by month) ──
const budgetsByYearMonth: Record<string, { opIncome: number; opExpense: number; resIncome: number; resExpense: number }> = {};
for (const yr of [currentYear, currentYear + 1]) {
const budgetRows = await this.tenant.query(
`SELECT b.fund_type, a.account_type,
b.jan, b.feb, b.mar, b.apr, b.may, b.jun,
b.jul, b.aug, b.sep, b.oct, b.nov, b.dec_amt
FROM budgets b
JOIN accounts a ON a.id = b.account_id
WHERE b.fiscal_year = $1`, [yr],
);
for (let m = 0; m < 12; m++) {
const key = `${yr}-${m + 1}`;
if (!budgetsByYearMonth[key]) budgetsByYearMonth[key] = { opIncome: 0, opExpense: 0, resIncome: 0, resExpense: 0 };
for (const row of budgetRows) {
const amt = parseFloat(row[monthNames[m]]) || 0;
if (amt === 0) continue;
const isOp = row.fund_type === 'operating';
if (row.account_type === 'income') {
if (isOp) budgetsByYearMonth[key].opIncome += amt;
else budgetsByYearMonth[key].resIncome += amt;
} else if (row.account_type === 'expense') {
if (isOp) budgetsByYearMonth[key].opExpense += amt;
else budgetsByYearMonth[key].resExpense += amt;
}
}
}
}
// ── 4) Investment maturities ──
const maturities = await this.tenant.query(`
SELECT fund_type, current_value, maturity_date, interest_rate, purchase_date
FROM investment_accounts
WHERE is_active = true AND maturity_date IS NOT NULL AND maturity_date > CURRENT_DATE
`);
const maturityIndex: Record<string, { operating: number; reserve: number }> = {};
for (const inv of maturities) {
const d = new Date(inv.maturity_date);
const key = `${d.getFullYear()}-${d.getMonth() + 1}`;
if (!maturityIndex[key]) maturityIndex[key] = { operating: 0, reserve: 0 };
const val = parseFloat(inv.current_value) || 0;
const rate = parseFloat(inv.interest_rate) || 0;
const purchaseDate = inv.purchase_date ? new Date(inv.purchase_date) : new Date();
const matDate = new Date(inv.maturity_date);
const daysHeld = Math.max((matDate.getTime() - purchaseDate.getTime()) / 86400000, 1);
const interestEarned = val * (rate / 100) * (daysHeld / 365);
const maturityTotal = val + interestEarned;
if (inv.fund_type === 'operating') maturityIndex[key].operating += maturityTotal;
else maturityIndex[key].reserve += maturityTotal;
}
// ── 5) Capital project expenses ──
const projectExpenses = await this.tenant.query(`
SELECT estimated_cost, target_year, target_month, fund_source
FROM projects
WHERE is_active = true AND status IN ('planned', 'in_progress')
AND target_year IS NOT NULL AND estimated_cost > 0
`);
const projectIndex: Record<string, { operating: number; reserve: number }> = {};
for (const p of projectExpenses) {
const yr = parseInt(p.target_year);
const mo = parseInt(p.target_month) || 6;
const key = `${yr}-${mo}`;
if (!projectIndex[key]) projectIndex[key] = { operating: 0, reserve: 0 };
const cost = parseFloat(p.estimated_cost) || 0;
if (p.fund_source === 'operating') projectIndex[key].operating += cost;
else projectIndex[key].reserve += cost;
}
// ── 6) Build 12-month forward datapoints ──
const datapoints: any[] = [];
for (let i = 0; i < forecastMonths; i++) {
const year = currentYear + Math.floor((currentMonth - 1 + i) / 12);
const month = ((currentMonth - 1 + i) % 12) + 1;
const key = `${year}-${month}`;
const label = `${monthLabels[month - 1]} ${year}`;
const assessments = getAssessmentIncome(month);
const budget = budgetsByYearMonth[key] || { opIncome: 0, opExpense: 0, resIncome: 0, resExpense: 0 };
const maturity = maturityIndex[key] || { operating: 0, reserve: 0 };
const project = projectIndex[key] || { operating: 0, reserve: 0 };
// Use budget income if available, else assessment income
const opIncomeMonth = budget.opIncome > 0 ? budget.opIncome : assessments.operating;
const resIncomeMonth = budget.resIncome > 0 ? budget.resIncome : assessments.reserve;
// Net change: income - expenses - project costs + maturity returns
runOpCash += opIncomeMonth - budget.opExpense - project.operating + maturity.operating;
runResCash += resIncomeMonth - budget.resExpense - project.reserve + maturity.reserve;
// Subtract maturing investment values from investment balances
if (maturity.operating > 0) runOpInv = Math.max(0, runOpInv - (maturity.operating * 0.96));
if (maturity.reserve > 0) runResInv = Math.max(0, runResInv - (maturity.reserve * 0.96));
datapoints.push({
month: label,
operating_cash: Math.round(runOpCash * 100) / 100,
operating_investments: Math.round(runOpInv * 100) / 100,
reserve_cash: Math.round(runResCash * 100) / 100,
reserve_investments: Math.round(runResInv * 100) / 100,
// Include drivers for transparency
op_income: Math.round(opIncomeMonth * 100) / 100,
op_expense: Math.round(budget.opExpense * 100) / 100,
res_income: Math.round(resIncomeMonth * 100) / 100,
res_expense: Math.round(budget.resExpense * 100) / 100,
project_cost_op: Math.round(project.operating * 100) / 100,
project_cost_res: Math.round(project.reserve * 100) / 100,
maturity_op: Math.round(maturity.operating * 100) / 100,
maturity_res: Math.round(maturity.reserve * 100) / 100,
});
}
// Build assessment schedule summary for the AI
const assessmentSchedule = assessmentGroups.map((g: any) => ({
frequency: g.frequency || 'monthly',
regular_per_unit: parseFloat(g.regular_assessment) || 0,
special_per_unit: parseFloat(g.special_assessment) || 0,
units: parseInt(g.unit_count) || 0,
total_regular: (parseFloat(g.regular_assessment) || 0) * (parseInt(g.unit_count) || 0),
total_special: (parseFloat(g.special_assessment) || 0) * (parseInt(g.unit_count) || 0),
}));
return {
datapoints,
assessment_schedule: assessmentSchedule,
};
}
// ── Private: AI Prompt Construction ──
private buildPromptMessages(
snapshot: any,
allRates: { cd: MarketRate[]; money_market: MarketRate[]; high_yield_savings: MarketRate[] },
monthlyForecast: any,
) {
const { summary, investment_accounts, budgets, projects, cash_flow_context } = snapshot;
const today = new Date().toISOString().split('T')[0];
const systemPrompt = `You are a financial advisor specializing in HOA (Homeowners Association) reserve fund management and conservative investment strategy. You provide fiduciary-grade investment recommendations.
CRITICAL RULES:
1. HOAs are legally required to maintain adequate reserves. NEVER recommend depleting reserve funds below safe levels.
2. HOA investments must be conservative ONLY: CDs, money market accounts, treasury bills, and high-yield savings. NO stocks, bonds, mutual funds, or speculative instruments.
3. Liquidity is paramount: always ensure enough cash to cover at least 3 months of operating expenses AND any capital project expenses due within the next 12 months.
4. CD laddering is the preferred strategy for reserve funds — it balances yield with regular liquidity access.
5. Operating funds should remain highly liquid (money market or high-yield savings only).
6. Respect the separation between operating funds and reserve funds. Never suggest commingling.
7. Base your recommendations ONLY on the available market rates (CDs, Money Market, High Yield Savings) provided. Do not reference rates or banks not in the provided data.
8. CRITICAL: Use the 12-MONTH CASH FLOW FORECAST to understand future liquidity. The forecast includes projected income (regular assessments AND special assessments collected from homeowners), budgeted expenses, investment maturities, and capital project costs. Do NOT flag liquidity shortfalls if the forecast shows sufficient income arriving before the expense is due.
9. When recommending money market or high yield savings accounts, focus on their liquidity advantages for operating funds. When recommending CDs, focus on their higher yields for longer-term reserve fund placement.
10. Compare current account rates against available market rates. If better rates are available, suggest specific moves with the potential additional interest income that could be earned.
RESPONSE FORMAT:
Respond with ONLY valid JSON (no markdown, no code fences) matching this exact schema:
{
"recommendations": [
{
"type": "cd_ladder" | "new_investment" | "reallocation" | "maturity_action" | "liquidity_warning" | "general",
"priority": "high" | "medium" | "low",
"title": "Short action title (under 60 chars)",
"summary": "One sentence summary of the recommendation",
"details": "Detailed explanation with specific dollar amounts and timeframes",
"fund_type": "operating" | "reserve" | "both",
"suggested_amount": 50000.00,
"suggested_term": "12 months",
"suggested_rate": 4.50,
"bank_name": "Bank name from market rates (if applicable)",
"rationale": "Financial reasoning for why this makes sense"
}
],
"overall_assessment": "2-3 sentence overview of the HOA's current investment position and opportunities",
"risk_notes": ["Array of risk items or concerns to flag for the board"]
}
IMPORTANT: Provide 3-7 actionable recommendations. Prioritize high-priority items (liquidity risks, maturing investments) before optimization opportunities. Include specific dollar amounts wherever possible. When there are opportunities for better rates on existing positions, quantify the additional annual interest that could be earned.`;
// Build the data context for the user prompt
const investmentsList = investment_accounts.length === 0
? 'No current investments.'
: investment_accounts.map((i: any) =>
`- ${i.name} | Type: ${i.investment_type} | Fund: ${i.fund_type} | Principal: $${parseFloat(i.principal).toFixed(2)} | Rate: ${parseFloat(i.interest_rate || '0').toFixed(2)}% | Maturity: ${i.maturity_date ? new Date(i.maturity_date).toLocaleDateString() : 'N/A'}`,
).join('\n');
const budgetLines = budgets.length === 0
? 'No budget data available.'
: budgets.map((b: any) =>
`- ${b.name} (${b.account_number}) | ${b.account_type}/${b.fund_type}: $${parseFloat(b.annual_total).toFixed(2)}/yr`,
).join('\n');
const projectLines = projects.length === 0
? 'No upcoming capital projects.'
: projects.map((p: any) =>
`- ${p.name} | Cost: $${parseFloat(p.estimated_cost).toFixed(2)} | Target: ${p.target_year || '?'}/${p.target_month || '?'} | Fund: ${p.fund_source} | Status: ${p.status} | Funded: ${parseFloat(p.funded_percentage || '0').toFixed(1)}%`,
).join('\n');
const budgetSummaryLines = (cash_flow_context.budget_summary || []).length === 0
? 'No budget summary available.'
: cash_flow_context.budget_summary.map((b: any) =>
`- ${b.fund_type} ${b.account_type}: $${parseFloat(b.annual_total).toFixed(2)}/yr (~$${(parseFloat(b.annual_total) / 12).toFixed(2)}/mo)`,
).join('\n');
// Format market rates by type
const formatRates = (rates: MarketRate[], typeLabel: string): string => {
if (rates.length === 0) return `No ${typeLabel} rate data available. Rate fetcher may not have been run yet.`;
return rates.map((r: MarketRate) => {
const termStr = r.term !== 'N/A' ? ` | Term: ${r.term}` : '';
return `- ${r.bank_name} | APY: ${parseFloat(String(r.apy)).toFixed(2)}%${termStr} | Min Deposit: ${r.min_deposit ? '$' + parseFloat(String(r.min_deposit)).toLocaleString() : 'N/A'}`;
}).join('\n');
};
const cdRateLines = formatRates(allRates.cd, 'CD');
const moneyMarketLines = formatRates(allRates.money_market, 'Money Market');
const savingsRateLines = formatRates(allRates.high_yield_savings, 'High Yield Savings');
// Format assessment schedule showing regular + special
const assessmentScheduleLines = (monthlyForecast.assessment_schedule || []).length === 0
? 'No assessment schedule available.'
: monthlyForecast.assessment_schedule.map((a: any) =>
`- ${a.frequency} collection | ${a.units} units | Regular: $${a.regular_per_unit.toFixed(2)}/unit ($${a.total_regular.toFixed(2)} total) → Operating | Special: $${a.special_per_unit.toFixed(2)}/unit ($${a.total_special.toFixed(2)} total) → Reserve`,
).join('\n');
// Format 12-month forecast table
const forecastLines = (monthlyForecast.datapoints || []).map((dp: any) => {
const drivers: string[] = [];
if (dp.op_income > 0) drivers.push(`OpInc:$${dp.op_income.toFixed(0)}`);
if (dp.op_expense > 0) drivers.push(`OpExp:$${dp.op_expense.toFixed(0)}`);
if (dp.res_income > 0) drivers.push(`ResInc:$${dp.res_income.toFixed(0)}`);
if (dp.res_expense > 0) drivers.push(`ResExp:$${dp.res_expense.toFixed(0)}`);
if (dp.project_cost_res > 0) drivers.push(`ResProjCost:$${dp.project_cost_res.toFixed(0)}`);
if (dp.project_cost_op > 0) drivers.push(`OpProjCost:$${dp.project_cost_op.toFixed(0)}`);
if (dp.maturity_op > 0) drivers.push(`OpMaturity:$${dp.maturity_op.toFixed(0)}`);
if (dp.maturity_res > 0) drivers.push(`ResMaturity:$${dp.maturity_res.toFixed(0)}`);
return `- ${dp.month} | OpCash: $${dp.operating_cash.toFixed(0)} | ResCash: $${dp.reserve_cash.toFixed(0)} | OpInv: $${dp.operating_investments.toFixed(0)} | ResInv: $${dp.reserve_investments.toFixed(0)} | Drivers: ${drivers.join(', ') || 'none'}`;
}).join('\n');
const userPrompt = `Analyze this HOA's financial position and provide investment recommendations.
TODAY'S DATE: ${today}
=== CURRENT CASH POSITIONS ===
Operating Cash (bank accounts): $${summary.operating_cash.toFixed(2)}
Reserve Cash (bank accounts): $${summary.reserve_cash.toFixed(2)}
Operating Investments: $${summary.operating_investments.toFixed(2)}
Reserve Investments: $${summary.reserve_investments.toFixed(2)}
Total Operating Fund: $${summary.total_operating.toFixed(2)}
Total Reserve Fund: $${summary.total_reserve.toFixed(2)}
Grand Total: $${summary.total_all.toFixed(2)}
=== CURRENT INVESTMENTS ===
${investmentsList}
=== ASSESSMENT INCOME SCHEDULE ===
${assessmentScheduleLines}
Note: "Regular" assessments fund Operating. "Special" assessments fund Reserve. Both are collected from homeowners per the frequency above.
=== ANNUAL BUDGET (${new Date().getFullYear()}) ===
${budgetLines}
=== BUDGET SUMMARY (Annual Totals by Category) ===
${budgetSummaryLines}
=== MONTHLY ASSESSMENT INCOME ===
Recurring monthly regular assessment income: $${cash_flow_context.monthly_assessment_income.toFixed(2)}/month (operating fund)
=== UPCOMING CAPITAL PROJECTS ===
${projectLines}
=== 12-MONTH CASH FLOW FORECAST (Projected) ===
This forecast shows month-by-month projected balances factoring in ALL income (regular assessments, special assessments, budgeted income), ALL expenses (budgeted expenses, capital project costs), and investment maturities.
${forecastLines}
=== AVAILABLE MARKET RATES ===
--- CD Rates ---
${cdRateLines}
--- Money Market Rates ---
${moneyMarketLines}
--- High Yield Savings Rates ---
${savingsRateLines}
Based on this complete financial picture INCLUDING the 12-month cash flow forecast, provide your investment recommendations. Consider:
1. Is there excess cash that could earn better returns in CDs, money market accounts, or high-yield savings?
2. Are any current investments maturing soon that need reinvestment planning?
3. Is the liquidity position adequate for upcoming expenses and projects? USE THE FORECAST to check — if income (including special assessments) arrives before expenses are due, the position may be adequate even if current cash seems low.
4. Would a CD ladder strategy improve the yield while maintaining access to funds?
5. Are operating and reserve funds properly separated in the investment strategy?
6. Could any current money market or savings accounts earn better rates at a different bank? Quantify the potential additional annual interest.
7. For operating funds that need to stay liquid, are money market or high-yield savings accounts being used optimally?`;
return [
{ role: 'system', content: systemPrompt },
{ role: 'user', content: userPrompt },
];
}
// ── Private: AI API Call ──
private async callAI(messages: Array<{ role: string; content: string }>): Promise<AIResponse> {
const apiUrl = this.configService.get<string>('AI_API_URL') || 'https://integrate.api.nvidia.com/v1';
const apiKey = this.configService.get<string>('AI_API_KEY');
const model = this.configService.get<string>('AI_MODEL') || 'qwen/qwen3.5-397b-a17b';
if (!apiKey) {
this.logger.error('AI_API_KEY not configured');
return {
recommendations: [],
overall_assessment: 'AI recommendations are not available. The AI_API_KEY has not been configured in the environment.',
risk_notes: ['Configure AI_API_KEY in .env to enable investment recommendations.'],
};
}
const requestBody = {
model,
messages,
temperature: 0.3,
max_tokens: 4096,
};
const bodyString = JSON.stringify(requestBody);
this.debug('prompt_system', messages[0]?.content);
this.debug('prompt_user', messages[1]?.content);
this.debug('request_meta', {
url: `${apiUrl}/chat/completions`,
model,
temperature: 0.3,
max_tokens: 4096,
body_length_bytes: Buffer.byteLength(bodyString, 'utf-8'),
message_count: messages.length,
});
try {
this.logger.log(`Calling AI API: ${apiUrl} with model ${model} (body: ${Buffer.byteLength(bodyString, 'utf-8')} bytes)`);
const startTime = Date.now();
// Use Node.js https module instead of native fetch for better
// compatibility in Docker Alpine environments
const { URL } = await import('url');
const https = await import('https');
const aiResult = await new Promise<any>((resolve, reject) => {
const url = new URL(`${apiUrl}/chat/completions`);
const options = {
hostname: url.hostname,
port: url.port || 443,
path: url.pathname,
method: 'POST',
headers: {
'Authorization': `Bearer ${apiKey}`,
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(bodyString, 'utf-8'),
},
timeout: 600000, // 10 minute timeout
};
const req = https.request(options, (res) => {
let data = '';
res.on('data', (chunk) => { data += chunk; });
res.on('end', () => {
resolve({ status: res.statusCode, body: data });
});
});
req.on('error', (err) => reject(err));
req.on('timeout', () => {
req.destroy();
reject(new Error(`Request timed out after 600s`));
});
req.write(bodyString);
req.end();
});
const elapsed = Date.now() - startTime;
this.logger.log(`AI API responded in ${elapsed}ms with status ${aiResult.status}`);
this.debug('response_timing', { elapsed_ms: elapsed, status: aiResult.status });
if (aiResult.status >= 400) {
this.logger.error(`AI API error ${aiResult.status}: ${aiResult.body}`);
this.debug('response_error_body', aiResult.body);
throw new Error(`AI API returned ${aiResult.status}: ${aiResult.body}`);
}
const data = JSON.parse(aiResult.body);
const msg = data.choices?.[0]?.message;
// Thinking models (kimi-k2.5) may return content in 'content' or
// spend all tokens on 'reasoning_content' with content=null
const content = msg?.content || null;
this.logger.log(`AI response: content=${content ? content.length + ' chars' : 'null'}, reasoning=${msg?.reasoning_content ? 'yes' : 'no'}, finish=${data.choices?.[0]?.finish_reason}`);
this.debug('response_raw_content', content);
this.debug('response_usage', data.usage);
if (msg?.reasoning_content) {
this.debug('response_reasoning', msg.reasoning_content);
}
if (!content) {
throw new Error('AI model returned empty content — it may have exhausted tokens on reasoning. Try a non-thinking model or increase max_tokens.');
}
// Parse the JSON response — handle potential markdown code fences
let cleaned = content.trim();
if (cleaned.startsWith('```')) {
cleaned = cleaned.replace(/^```(?:json)?\s*\n?/, '').replace(/\n?```\s*$/, '');
}
// Handle thinking model wrapper: strip <think>...</think> blocks
cleaned = cleaned.replace(/<think>[\s\S]*?<\/think>\s*/g, '').trim();
const parsed = JSON.parse(cleaned) as AIResponse;
// Validate the response structure
if (!parsed.recommendations || !Array.isArray(parsed.recommendations)) {
this.debug('invalid_response_structure', parsed);
throw new Error('Invalid AI response: missing recommendations array');
}
this.logger.log(`AI returned ${parsed.recommendations.length} recommendations`);
this.debug('parsed_recommendations', parsed.recommendations.map((r) => ({
type: r.type,
priority: r.priority,
title: r.title,
fund_type: r.fund_type,
suggested_amount: r.suggested_amount,
})));
return parsed;
} catch (error: any) {
// Log the full error chain for debugging
this.logger.error(`AI recommendation failed: ${error.message}`);
if (error.cause) {
this.logger.error(` → cause: ${error.cause?.message || error.cause}`);
this.debug('error_cause', {
message: error.cause?.message,
code: error.cause?.code,
errno: error.cause?.errno,
syscall: error.cause?.syscall,
hostname: error.cause?.hostname,
stack: error.cause?.stack,
});
}
this.debug('error_full', {
message: error.message,
name: error.name,
code: error.code,
stack: error.stack,
});
// For JSON parse errors, return what we can
if (error instanceof SyntaxError) {
return {
recommendations: [],
overall_assessment: 'The AI service returned an invalid response format. Please try again.',
risk_notes: [`Response parsing error: ${error.message}`],
};
}
// For network/timeout errors, return a graceful fallback
return {
recommendations: [],
overall_assessment: 'Unable to generate AI recommendations at this time. Please try again later.',
risk_notes: [`AI service error: ${error.message}`],
};
}
}
}