Skip to content

Commit 4043a0d

Browse files
Fix: Improve Telegram Service Connection Handling (#7619)
* fix: improve telegram service connection handling - Add connection timeout and reconnection limits - Improve error handling for specific scenarios - Clean up event handlers and intervals properly - Add proper TTL management for connections - Prevent infinite reconnection loops - Add exponential backoff for reconnection attempts Co-Authored-By: [email protected] <[email protected]> * fix: make clearTTL public to fix build error Co-Authored-By: [email protected] <[email protected]> * chore: fix code formatting Co-Authored-By: [email protected] <[email protected]> * chore: fix formatting with rush fast-format Co-Authored-By: [email protected] <[email protected]> * chore: fix formatting with rush fast-format Co-Authored-By: [email protected] <[email protected]> --------- Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: [email protected] <[email protected]>
1 parent 3bee65d commit 4043a0d

File tree

1 file changed

+166
-28
lines changed

1 file changed

+166
-28
lines changed

services/telegram/pod-telegram/src/telegram.ts

Lines changed: 166 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import { StringSession } from 'telegram/sessions'
1010
import type { Dialog } from 'telegram/tl/custom/dialog'
1111
import config from './config'
1212
import { ApiError, Code } from './error'
13+
// Using built-in types instead of node:timers for better compatibility
14+
type Timeout = ReturnType<typeof setTimeout>
1315

1416
Logger.setLevel('none')
1517

@@ -107,8 +109,16 @@ class TelegramConnection {
107109
private readonly listeners = new Map<number, Listener>()
108110
private subID = 0
109111
private handlerAdded = false
110-
private readinessInterval: NodeJS.Timeout | undefined
112+
private readinessInterval: Timeout | undefined
111113
private connecting = false
114+
private reconnectAttempts = 0
115+
private static readonly MAX_RECONNECT_ATTEMPTS = 3
116+
private static readonly RECONNECT_TIMEOUT = 5000 // 5 seconds timeout for connection attempts
117+
private static readonly RECONNECT_BACKOFF = 1000 // Add 1 second delay between retries
118+
119+
private delay (ms: number): Promise<void> {
120+
return new Promise((resolve) => setTimeout(resolve, ms))
121+
}
112122

113123
constructor (
114124
readonly client: TelegramClient,
@@ -128,21 +138,79 @@ class TelegramConnection {
128138
}
129139
}
130140

141+
private async connectWithTimeout (): Promise<void> {
142+
const timeoutId = setTimeout(() => {
143+
// Force disconnect if we timeout
144+
void this.client.disconnect()
145+
throw new Error('Connection attempt timed out')
146+
}, TelegramConnection.RECONNECT_TIMEOUT)
147+
148+
try {
149+
await this.client.connect()
150+
} finally {
151+
clearTimeout(timeoutId)
152+
}
153+
}
154+
131155
async tryReconnect (): Promise<void> {
132156
if (this.connecting) {
133157
return
134158
}
135159

136160
if (this.client.connected === true) {
137161
console.log('Already connected')
162+
this.reconnectAttempts = 0 // Reset counter on successful connection
163+
return
164+
}
165+
166+
if (this.reconnectAttempts >= TelegramConnection.MAX_RECONNECT_ATTEMPTS) {
167+
console.error(`Max reconnection attempts (${TelegramConnection.MAX_RECONNECT_ATTEMPTS}) reached`)
168+
// Reset counter but wait for next interval
169+
this.reconnectAttempts = 0
138170
return
139171
}
140172

141173
try {
142174
this.connecting = true
143-
await this.client.connect()
175+
await this.connectWithTimeout()
176+
this.reconnectAttempts = 0 // Reset on successful connection
144177
} catch (e: unknown) {
145-
console.error(e)
178+
this.reconnectAttempts++
179+
180+
// Handle specific error types
181+
if (e instanceof RPCError) {
182+
// Handle Telegram RPC-specific errors
183+
const rpcError = e
184+
console.error(`Telegram RPC error during connection attempt ${this.reconnectAttempts}: ${rpcError.message}`)
185+
if (rpcError.message.includes('AUTH_KEY_UNREGISTERED') || rpcError.message.includes('SESSION_REVOKED')) {
186+
// Authentication errors - need to re-authenticate
187+
this._signInFlow = undefined // Force re-authentication
188+
throw new ApiError(Code.PhoneCodeInvalid, 'Re-authentication required') // Use existing auth error code
189+
}
190+
} else if (e instanceof Error) {
191+
const error = e
192+
if (error.message.includes('Connection attempt timed out')) {
193+
console.error(`Connection timeout on attempt ${this.reconnectAttempts}`)
194+
} else if (error.message.includes('ECONNREFUSED') || error.message.includes('ENETUNREACH')) {
195+
console.error(`Network error on attempt ${this.reconnectAttempts}: ${error.message}`)
196+
} else {
197+
console.error(`Connection attempt ${this.reconnectAttempts} failed: ${error.message}`)
198+
}
199+
} else {
200+
console.error(`Connection attempt ${this.reconnectAttempts} failed with unknown error`)
201+
}
202+
203+
// Add exponential backoff delay using callback
204+
const backoffDelay = TelegramConnection.RECONNECT_BACKOFF * Math.pow(2, this.reconnectAttempts - 1)
205+
await this.delay(backoffDelay)
206+
207+
// If we've hit max attempts, emit a more specific error
208+
if (this.reconnectAttempts >= TelegramConnection.MAX_RECONNECT_ATTEMPTS) {
209+
throw new ApiError(
210+
Code.PhoneCodeInvalid,
211+
`Failed to connect after ${TelegramConnection.MAX_RECONNECT_ATTEMPTS} attempts`
212+
) // Use existing error code
213+
}
146214
} finally {
147215
this.connecting = false
148216
}
@@ -156,25 +224,57 @@ class TelegramConnection {
156224
await this.tryReconnect()
157225
}
158226
} catch (err) {
159-
console.log(err)
227+
if (err instanceof Error) {
228+
console.error(`Reconnection error: ${err.message}`)
229+
} else {
230+
console.error('Unknown reconnection error occurred')
231+
}
160232
}
161233
}
162234

163-
async close (): Promise<void> {
164-
if (this.readinessInterval !== undefined) {
165-
clearInterval(this.readinessInterval)
166-
}
235+
private clearHandlers (): void {
236+
// Clear all event handlers to prevent memory leaks
167237
this.client.listEventHandlers().forEach(([builder, callback]) => {
168238
this.client.removeEventHandler(callback, builder)
169239
})
170-
await this.client.disconnect()
240+
this.handlerAdded = false
241+
this.listeners.clear()
242+
this.subID = 0
243+
}
244+
245+
private clearIntervals (): void {
246+
if (this.readinessInterval !== undefined) {
247+
clearInterval(this.readinessInterval)
248+
this.readinessInterval = undefined
249+
}
250+
}
251+
252+
async close (): Promise<void> {
253+
this.clearIntervals()
254+
this.clearHandlers()
255+
256+
try {
257+
await this.client.disconnect()
258+
} catch (e) {
259+
// Log but don't throw as we're cleaning up
260+
console.error('Error during disconnect:', e instanceof Error ? e.message : 'Unknown error')
261+
}
171262
}
172263

173264
async signOut (): Promise<void> {
174-
await this.client.invoke(new Api.auth.LogOut())
265+
try {
266+
await this.client.invoke(new Api.auth.LogOut())
267+
} finally {
268+
// Always clean up resources even if logout fails
269+
await this.close()
270+
this._signInFlow = undefined
271+
}
175272
}
176273

177274
async signIn (): Promise<void> {
275+
// Clear any existing handlers before starting new sign in
276+
this.clearHandlers()
277+
178278
this._signInFlow = new SignInFlow(this.client, this.phone, () => {
179279
this._signInFlow = undefined
180280
this.client.session.save()
@@ -328,17 +428,22 @@ export const telegram = new (class TelegramHelper {
328428
readonly ttls = new Map<string, NodeJS.Timeout>()
329429

330430
async auth (phone: string): Promise<SignInState> {
431+
// Clear any existing TTL timeout for this phone
432+
this.clearTTL(phone)
433+
331434
const conn = await this.getOrCreate(phone)
332435

333-
if (!this.ttls.has(phone)) {
334-
this.ttls.set(
335-
phone,
336-
setTimeout(() => {
337-
this.conns.delete(phone)
338-
void conn.close()
339-
}, config.TelegramAuthTTL)
340-
)
341-
}
436+
// Set new TTL timeout
437+
this.ttls.set(
438+
phone,
439+
setTimeout(() => {
440+
console.log(`TTL expired for connection ${phone}, cleaning up...`)
441+
this.forgetConnection(phone)
442+
void conn.close().catch((e) => {
443+
console.error(`Error during TTL cleanup for ${phone}:`, e instanceof Error ? e.message : 'Unknown error')
444+
})
445+
}, config.TelegramAuthTTL)
446+
)
342447

343448
if (conn.signInFlow !== undefined) {
344449
return conn.signInFlow.state
@@ -347,23 +452,50 @@ export const telegram = new (class TelegramHelper {
347452
try {
348453
await conn.signIn()
349454
} catch (err) {
455+
// On error, clean up everything
350456
this.forgetConnection(phone)
351-
await conn.close()
457+
await conn.close().catch((e) => {
458+
console.error(`Error during error cleanup for ${phone}:`, e instanceof Error ? e.message : 'Unknown error')
459+
})
352460

353461
throw err
354462
}
355463

356464
return 'code'
357465
}
358466

467+
clearTTL (phone: string): void {
468+
const existingTTL = this.ttls.get(phone)
469+
if (existingTTL !== undefined) {
470+
clearTimeout(existingTTL)
471+
this.ttls.delete(phone)
472+
}
473+
}
474+
359475
async authCode (phone: string, code: string): Promise<boolean> {
360476
const conn = this.conns.get(phone)
361477

362478
if (conn?.signInFlow === undefined) {
363479
throw Error('Sign in is not initialized')
364480
}
365481

366-
return await conn.signInFlow.code(code)
482+
try {
483+
const needsPassword = await conn.signInFlow.code(code)
484+
485+
if (!needsPassword) {
486+
// Authentication completed successfully, reset TTL
487+
this.clearTTL(phone)
488+
}
489+
490+
return needsPassword
491+
} catch (err) {
492+
// On authentication error, clean up
493+
this.forgetConnection(phone)
494+
await conn.close().catch((e) => {
495+
console.error(`Error during auth code cleanup for ${phone}:`, e instanceof Error ? e.message : 'Unknown error')
496+
})
497+
throw err
498+
}
367499
}
368500

369501
async authPass (phone: string, pass: string): Promise<void> {
@@ -373,7 +505,18 @@ export const telegram = new (class TelegramHelper {
373505
throw Error('Sign in is not initialized')
374506
}
375507

376-
await conn.signInFlow.pass(pass)
508+
try {
509+
await conn.signInFlow.pass(pass)
510+
// Authentication completed successfully, reset TTL
511+
this.clearTTL(phone)
512+
} catch (err) {
513+
// On authentication error, clean up
514+
this.forgetConnection(phone)
515+
await conn.close().catch((e) => {
516+
console.error(`Error during auth pass cleanup for ${phone}:`, e instanceof Error ? e.message : 'Unknown error')
517+
})
518+
throw err
519+
}
377520
}
378521

379522
async getOrCreate (phone: string): Promise<TelegramConnection> {
@@ -395,12 +538,7 @@ export const telegram = new (class TelegramHelper {
395538

396539
forgetConnection (phone: string): void {
397540
this.conns.delete(phone)
398-
const timeout = this.ttls.get(phone)
399-
400-
if (timeout !== undefined) {
401-
this.ttls.delete(phone)
402-
clearTimeout(timeout)
403-
}
541+
this.clearTTL(phone)
404542
}
405543

406544
async create (phone: string, token?: string): Promise<TelegramConnection> {

0 commit comments

Comments
 (0)