feat: async AI calls, 10-min timeout, and failure messaging
- Make all AI endpoints (health scores + investment recommendations) fire-and-forget: POST returns immediately, frontend polls for results - Extend AI API timeout from 2-5 min to 10 min for both services - Add "last analysis failed — showing cached data" message to the Investment Recommendations panel (matches health score widgets) - Add status/error_message columns to ai_recommendations table - Remove nginx AI timeout overrides (no longer needed) - Users can now navigate away during AI processing without interruption Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
import { Controller, Get, Post, UseGuards, Req } from '@nestjs/common';
|
||||
import { Controller, Get, Post, UseGuards, Req, Logger } from '@nestjs/common';
|
||||
import { ApiTags, ApiBearerAuth, ApiOperation } from '@nestjs/swagger';
|
||||
import { JwtAuthGuard } from '../auth/guards/jwt-auth.guard';
|
||||
import { AllowViewer } from '../../common/decorators/allow-viewer.decorator';
|
||||
@@ -9,6 +9,8 @@ import { HealthScoresService } from './health-scores.service';
|
||||
@ApiBearerAuth()
|
||||
@UseGuards(JwtAuthGuard)
|
||||
export class HealthScoresController {
|
||||
private readonly logger = new Logger(HealthScoresController.name);
|
||||
|
||||
constructor(private service: HealthScoresService) {}
|
||||
|
||||
@Get('latest')
|
||||
@@ -19,32 +21,56 @@ export class HealthScoresController {
|
||||
}
|
||||
|
||||
@Post('calculate')
|
||||
@ApiOperation({ summary: 'Trigger both health score recalculations (used by scheduler)' })
|
||||
@ApiOperation({ summary: 'Trigger both health score recalculations (async — returns immediately)' })
|
||||
@AllowViewer()
|
||||
async calculate(@Req() req: any) {
|
||||
const schema = req.user?.orgSchema;
|
||||
const [operating, reserve] = await Promise.all([
|
||||
|
||||
// Fire-and-forget — background processing saves results to DB
|
||||
Promise.all([
|
||||
this.service.calculateScore(schema, 'operating'),
|
||||
this.service.calculateScore(schema, 'reserve'),
|
||||
]);
|
||||
return { operating, reserve };
|
||||
]).catch((err) => {
|
||||
this.logger.error(`Background health score calculation failed: ${err.message}`);
|
||||
});
|
||||
|
||||
return {
|
||||
status: 'processing',
|
||||
message: 'Health score calculations started. Results will appear when ready.',
|
||||
};
|
||||
}
|
||||
|
||||
@Post('calculate/operating')
|
||||
@ApiOperation({ summary: 'Recalculate operating fund health score only' })
|
||||
@ApiOperation({ summary: 'Trigger operating fund health score recalculation (async)' })
|
||||
@AllowViewer()
|
||||
async calculateOperating(@Req() req: any) {
|
||||
const schema = req.user?.orgSchema;
|
||||
const operating = await this.service.calculateScore(schema, 'operating');
|
||||
return { operating };
|
||||
|
||||
// Fire-and-forget
|
||||
this.service.calculateScore(schema, 'operating').catch((err) => {
|
||||
this.logger.error(`Background operating score failed: ${err.message}`);
|
||||
});
|
||||
|
||||
return {
|
||||
status: 'processing',
|
||||
message: 'Operating fund health score calculation started.',
|
||||
};
|
||||
}
|
||||
|
||||
@Post('calculate/reserve')
|
||||
@ApiOperation({ summary: 'Recalculate reserve fund health score only' })
|
||||
@ApiOperation({ summary: 'Trigger reserve fund health score recalculation (async)' })
|
||||
@AllowViewer()
|
||||
async calculateReserve(@Req() req: any) {
|
||||
const schema = req.user?.orgSchema;
|
||||
const reserve = await this.service.calculateScore(schema, 'reserve');
|
||||
return { reserve };
|
||||
|
||||
// Fire-and-forget
|
||||
this.service.calculateScore(schema, 'reserve').catch((err) => {
|
||||
this.logger.error(`Background reserve score failed: ${err.message}`);
|
||||
});
|
||||
|
||||
return {
|
||||
status: 'processing',
|
||||
message: 'Reserve fund health score calculation started.',
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1115,7 +1115,7 @@ Projected Year-End Total (Cash + Investments): $${data.projectedYearEndTotal.toF
|
||||
'Content-Type': 'application/json',
|
||||
'Content-Length': Buffer.byteLength(bodyString, 'utf-8'),
|
||||
},
|
||||
timeout: 120000,
|
||||
timeout: 600000, // 10 minute timeout
|
||||
};
|
||||
|
||||
const req = https.request(options, (res) => {
|
||||
@@ -1129,7 +1129,7 @@ Projected Year-End Total (Cash + Investments): $${data.projectedYearEndTotal.toF
|
||||
req.on('error', (err) => reject(err));
|
||||
req.on('timeout', () => {
|
||||
req.destroy();
|
||||
reject(new Error('Request timed out after 120s'));
|
||||
reject(new Error('Request timed out after 600s'));
|
||||
});
|
||||
|
||||
req.write(bodyString);
|
||||
|
||||
@@ -36,9 +36,9 @@ export class InvestmentPlanningController {
|
||||
}
|
||||
|
||||
@Post('recommendations')
|
||||
@ApiOperation({ summary: 'Get AI-powered investment recommendations' })
|
||||
@ApiOperation({ summary: 'Trigger AI-powered investment recommendations (async — returns immediately)' })
|
||||
@AllowViewer()
|
||||
getRecommendations(@Req() req: any) {
|
||||
return this.service.getAIRecommendations(req.user?.sub, req.user?.orgId);
|
||||
triggerRecommendations(@Req() req: any) {
|
||||
return this.service.triggerAIRecommendations(req.user?.sub, req.user?.orgId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,6 +65,9 @@ export interface SavedRecommendation {
|
||||
risk_notes: string[];
|
||||
response_time_ms: number;
|
||||
created_at: string;
|
||||
status: 'processing' | 'complete' | 'error';
|
||||
last_failed: boolean;
|
||||
error_message?: string;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
@@ -196,14 +199,33 @@ export class InvestmentPlanningService {
|
||||
return rates.cd;
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure the status/error_message columns exist (for tenants created before this migration).
|
||||
*/
|
||||
private async ensureStatusColumn(): Promise<void> {
|
||||
try {
|
||||
await this.tenant.query(
|
||||
`ALTER TABLE ai_recommendations ADD COLUMN IF NOT EXISTS status VARCHAR(20) DEFAULT 'complete'`,
|
||||
);
|
||||
await this.tenant.query(
|
||||
`ALTER TABLE ai_recommendations ADD COLUMN IF NOT EXISTS error_message TEXT`,
|
||||
);
|
||||
} catch {
|
||||
// Ignore — column may already exist or table may not exist
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the latest saved AI recommendation for this tenant.
|
||||
* Returns status and last_failed flag for UI state management.
|
||||
*/
|
||||
async getSavedRecommendation(): Promise<SavedRecommendation | null> {
|
||||
try {
|
||||
await this.ensureStatusColumn();
|
||||
|
||||
const rows = await this.tenant.query(
|
||||
`SELECT id, recommendations_json, overall_assessment, risk_notes,
|
||||
response_time_ms, created_at
|
||||
response_time_ms, status, error_message, created_at
|
||||
FROM ai_recommendations
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1`,
|
||||
@@ -212,6 +234,64 @@ export class InvestmentPlanningService {
|
||||
if (!rows || rows.length === 0) return null;
|
||||
|
||||
const row = rows[0];
|
||||
const status = row.status || 'complete';
|
||||
|
||||
// If still processing, return processing status
|
||||
if (status === 'processing') {
|
||||
return {
|
||||
id: row.id,
|
||||
recommendations: [],
|
||||
overall_assessment: '',
|
||||
risk_notes: [],
|
||||
response_time_ms: 0,
|
||||
created_at: row.created_at,
|
||||
status: 'processing',
|
||||
last_failed: false,
|
||||
};
|
||||
}
|
||||
|
||||
// If latest attempt failed, return the last successful result with last_failed flag
|
||||
if (status === 'error') {
|
||||
const lastGood = await this.tenant.query(
|
||||
`SELECT id, recommendations_json, overall_assessment, risk_notes,
|
||||
response_time_ms, created_at
|
||||
FROM ai_recommendations
|
||||
WHERE status = 'complete'
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1`,
|
||||
);
|
||||
|
||||
if (lastGood?.length) {
|
||||
const goodRow = lastGood[0];
|
||||
const recData = goodRow.recommendations_json || {};
|
||||
return {
|
||||
id: goodRow.id,
|
||||
recommendations: recData.recommendations || [],
|
||||
overall_assessment: goodRow.overall_assessment || recData.overall_assessment || '',
|
||||
risk_notes: goodRow.risk_notes || recData.risk_notes || [],
|
||||
response_time_ms: goodRow.response_time_ms || 0,
|
||||
created_at: goodRow.created_at,
|
||||
status: 'complete',
|
||||
last_failed: true,
|
||||
error_message: row.error_message,
|
||||
};
|
||||
}
|
||||
|
||||
// No previous good result — return error state
|
||||
return {
|
||||
id: row.id,
|
||||
recommendations: [],
|
||||
overall_assessment: row.error_message || 'AI analysis failed. Please try again.',
|
||||
risk_notes: [],
|
||||
response_time_ms: 0,
|
||||
created_at: row.created_at,
|
||||
status: 'error',
|
||||
last_failed: true,
|
||||
error_message: row.error_message,
|
||||
};
|
||||
}
|
||||
|
||||
// Complete — return the data normally
|
||||
const recData = row.recommendations_json || {};
|
||||
return {
|
||||
id: row.id,
|
||||
@@ -220,6 +300,8 @@ export class InvestmentPlanningService {
|
||||
risk_notes: row.risk_notes || recData.risk_notes || [],
|
||||
response_time_ms: row.response_time_ms || 0,
|
||||
created_at: row.created_at,
|
||||
status: 'complete',
|
||||
last_failed: false,
|
||||
};
|
||||
} catch (err: any) {
|
||||
// Table might not exist yet (pre-migration tenants)
|
||||
@@ -228,15 +310,153 @@ export class InvestmentPlanningService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Save a 'processing' placeholder record and return its ID.
|
||||
*/
|
||||
private async saveProcessingRecord(userId?: string): Promise<string> {
|
||||
await this.ensureStatusColumn();
|
||||
const rows = await this.tenant.query(
|
||||
`INSERT INTO ai_recommendations
|
||||
(recommendations_json, overall_assessment, risk_notes, requested_by, status)
|
||||
VALUES ('{}', '', '[]', $1, 'processing')
|
||||
RETURNING id`,
|
||||
[userId || null],
|
||||
);
|
||||
return rows[0].id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update a processing record with completed results.
|
||||
*/
|
||||
private async updateRecommendationComplete(
|
||||
jobId: string,
|
||||
aiResponse: AIResponse,
|
||||
userId: string | undefined,
|
||||
elapsed: number,
|
||||
): Promise<void> {
|
||||
try {
|
||||
await this.tenant.query(
|
||||
`UPDATE ai_recommendations
|
||||
SET recommendations_json = $1,
|
||||
overall_assessment = $2,
|
||||
risk_notes = $3,
|
||||
response_time_ms = $4,
|
||||
status = 'complete'
|
||||
WHERE id = $5`,
|
||||
[
|
||||
JSON.stringify(aiResponse),
|
||||
aiResponse.overall_assessment || '',
|
||||
JSON.stringify(aiResponse.risk_notes || []),
|
||||
elapsed,
|
||||
jobId,
|
||||
],
|
||||
);
|
||||
} catch (err: any) {
|
||||
this.logger.warn(`Could not update recommendation ${jobId}: ${err.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update a processing record with error status.
|
||||
*/
|
||||
private async updateRecommendationError(jobId: string, errorMessage: string): Promise<void> {
|
||||
try {
|
||||
await this.tenant.query(
|
||||
`UPDATE ai_recommendations
|
||||
SET status = 'error',
|
||||
error_message = $1
|
||||
WHERE id = $2`,
|
||||
[errorMessage, jobId],
|
||||
);
|
||||
} catch (err: any) {
|
||||
this.logger.warn(`Could not update recommendation error ${jobId}: ${err.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Trigger AI recommendations asynchronously.
|
||||
* Saves a 'processing' record, starts the AI work in the background, and returns immediately.
|
||||
* The TenantService instance remains alive via closure reference for the duration of the background work.
|
||||
*/
|
||||
async triggerAIRecommendations(userId?: string, orgId?: string): Promise<{ status: string; message: string }> {
|
||||
const jobId = await this.saveProcessingRecord(userId);
|
||||
this.logger.log(`AI recommendation triggered (job ${jobId}), starting background processing...`);
|
||||
|
||||
// Fire-and-forget — the Promise keeps this service instance (and TenantService) alive
|
||||
this.runBackgroundRecommendations(jobId, userId, orgId).catch((err) => {
|
||||
this.logger.error(`Background AI recommendation failed (job ${jobId}): ${err.message}`);
|
||||
});
|
||||
|
||||
return {
|
||||
status: 'processing',
|
||||
message: 'AI analysis has been started. You can navigate away safely — results will appear when ready.',
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Run the full AI recommendation pipeline in the background.
|
||||
*/
|
||||
private async runBackgroundRecommendations(jobId: string, userId?: string, orgId?: string): Promise<void> {
|
||||
try {
|
||||
const startTime = Date.now();
|
||||
|
||||
const [snapshot, allRates, monthlyForecast] = await Promise.all([
|
||||
this.getFinancialSnapshot(),
|
||||
this.getMarketRates(),
|
||||
this.getMonthlyForecast(),
|
||||
]);
|
||||
|
||||
this.debug('background_snapshot_summary', {
|
||||
job_id: jobId,
|
||||
operating_cash: snapshot.summary.operating_cash,
|
||||
reserve_cash: snapshot.summary.reserve_cash,
|
||||
total_all: snapshot.summary.total_all,
|
||||
investment_accounts: snapshot.investment_accounts.length,
|
||||
});
|
||||
|
||||
const messages = this.buildPromptMessages(snapshot, allRates, monthlyForecast);
|
||||
const aiResponse = await this.callAI(messages);
|
||||
const elapsed = Date.now() - startTime;
|
||||
|
||||
this.debug('background_final_response', {
|
||||
job_id: jobId,
|
||||
recommendation_count: aiResponse.recommendations.length,
|
||||
has_assessment: !!aiResponse.overall_assessment,
|
||||
elapsed_ms: elapsed,
|
||||
});
|
||||
|
||||
// Check if the AI returned a graceful error (empty recommendations with error message)
|
||||
const isGracefulError = aiResponse.recommendations.length === 0 &&
|
||||
(aiResponse.overall_assessment?.includes('Unable to generate') ||
|
||||
aiResponse.overall_assessment?.includes('invalid response'));
|
||||
|
||||
if (isGracefulError) {
|
||||
await this.updateRecommendationError(jobId, aiResponse.overall_assessment);
|
||||
} else {
|
||||
await this.updateRecommendationComplete(jobId, aiResponse, userId, elapsed);
|
||||
}
|
||||
|
||||
// Log AI usage (fire-and-forget)
|
||||
this.logAIUsage(userId, orgId, aiResponse, elapsed).catch(() => {});
|
||||
|
||||
this.logger.log(`Background AI recommendation completed (job ${jobId}) in ${elapsed}ms`);
|
||||
} catch (err: any) {
|
||||
this.logger.error(`Background AI recommendation error (job ${jobId}): ${err.message}`);
|
||||
await this.updateRecommendationError(jobId, err.message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Save AI recommendation result to tenant schema.
|
||||
* @deprecated Use triggerAIRecommendations() for async flow instead
|
||||
*/
|
||||
private async saveRecommendation(aiResponse: AIResponse, userId: string | undefined, elapsed: number): Promise<void> {
|
||||
try {
|
||||
await this.ensureStatusColumn();
|
||||
await this.tenant.query(
|
||||
`INSERT INTO ai_recommendations
|
||||
(recommendations_json, overall_assessment, risk_notes, requested_by, response_time_ms)
|
||||
VALUES ($1, $2, $3, $4, $5)`,
|
||||
(recommendations_json, overall_assessment, risk_notes, requested_by, response_time_ms, status)
|
||||
VALUES ($1, $2, $3, $4, $5, 'complete')`,
|
||||
[
|
||||
JSON.stringify(aiResponse),
|
||||
aiResponse.overall_assessment || '',
|
||||
@@ -873,7 +1093,7 @@ Based on this complete financial picture INCLUDING the 12-month cash flow foreca
|
||||
'Content-Type': 'application/json',
|
||||
'Content-Length': Buffer.byteLength(bodyString, 'utf-8'),
|
||||
},
|
||||
timeout: 300000, // 5 minute timeout
|
||||
timeout: 600000, // 10 minute timeout
|
||||
};
|
||||
|
||||
const req = https.request(options, (res) => {
|
||||
@@ -887,7 +1107,7 @@ Based on this complete financial picture INCLUDING the 12-month cash flow foreca
|
||||
req.on('error', (err) => reject(err));
|
||||
req.on('timeout', () => {
|
||||
req.destroy();
|
||||
reject(new Error(`Request timed out after 300s`));
|
||||
reject(new Error(`Request timed out after 600s`));
|
||||
});
|
||||
|
||||
req.write(bodyString);
|
||||
|
||||
Reference in New Issue
Block a user