Chat e Streaming (Socket.IO)
Questa sezione descrive come il Node Agent gestisce la chat in tempo reale e lo streaming delle risposte tramite Socket.IO, inclusa la creazione dei thread e la validazione JWT.
Autenticazione Socket
/main/src/server/socketServer.ts
const validateToken = (socket: CustomSocket, next: (err?: Error) => void) => {
const token = socket.handshake.auth.token
if (!token) return next(new Error('Authentication token is required'))
try {
const decoded = jwt.verify(token, process.env.JWT_SECRET ?? '') as jwt.JwtPayload & {
companyId: string
}
socket.companyId = decoded.companyId
next()
} catch (error) {
return next(new Error('Invalid token'))
}
}
Inizializzazione server Socket.IO
/main/src/server/socketServer.ts
export function initializeSocketServer(httpServer: HttpServer) {
const io = new Server(httpServer, { cors: { origin: '*', methods: ['GET', 'POST'] } })
io.use(validateToken)
io.on('connection', (socket: CustomSocket) => {
logger.info('New client connected', { socketId: socket.id, companyId: socket.companyId })
/* handlers ... */
})
return io
}
Client: esempi Socket.IO
Client JS (Socket.IO)
import { io } from 'socket.io-client'
const socket = io(BASE_URL, { auth: { token: JWT } })
socket.on('connect', () => console.log('connected', socket.id))
socket.on('typing', d => console.log('typing', d))
socket.on('chat_response_start', () => console.log('start'))
socket.on('chat_response_chunk', ({ response }) => console.log('chunk', response))
socket.on('chat_response_end', () => console.log('end'))
socket.on('error', e => console.error('error', e))
socket.emit('create_thread', { agentId: 'agent_123', type: 'agent' })
socket.emit('chat_message', {
message: 'Ciao! Mostrami scarpe nere 42',
threadId: 'thread_abc',
collection_name: 'company123_agentABC',
agentId: 'agent_123',
companyName: 'Shop',
companyDescription: 'Ecommerce',
language: 'italian',
hasDocuments: false,
websiteUrl: 'https://shop.example.com',
laravelAppUrl: 'https://app.example.com',
extraInstructions: '',
assistantInstructions: '',
type: 'agent',
})
Streaming della risposta (eventi Socket)
/main/src/server/socketServer.ts
socket.on('chat_message', async data => {
if (!socket.companyId) return socket.emit('error', { error: 'Unauthorized' })
const { message, threadId, collection_name, agentId /* ... */ } = data
socket.emit('typing', { isTyping: true })
const response = await chatApp.stream(
{ messages: [new HumanMessage(message)] },
{
configurable: {
thread_id: threadId,
agent_id: agentId,
collection_name /* ... */,
} as CustomConfigurable,
streamMode: 'messages',
}
)
let firstChunk = false
for await (const message of response) {
if (
(message[0] instanceof AIMessageChunk || message[0] instanceof AIMessage) &&
message[0].content.length > 0
) {
if (!firstChunk) {
firstChunk = true
socket.emit('typing', { isTyping: false })
socket.emit('chat_response_start')
}
socket.emit('chat_response_chunk', { response: message[0].content })
}
}
socket.emit('chat_response_end')
})
Creazione thread
- Via Socket: evento
create_thread(crea thread e registra conversazione in Laravel) - Via HTTP: endpoint dedicato.
/main/src/server/serverHttp.ts
app.post('/create-thread/:agentId', async (req, res) => {
const { agentId } = req.params
const { type } = req.query
const threadId = await chatService.createThread(agentId)
res.json({ threadId })
laravelService.createConversation(
{
assistant_or_agent_uuid: agentId,
conversation_uuid: threadId,
date_time: new Date().toISOString(),
sender: 'user',
},
type as AssistantType
)
})