// Python Dev

Как организовать обзвон очереди через SIP и Python

Опубликовано 01.04.2026

Заметка разбирает архитектуру автоматического обзвона: как организован конвейер обработки, как работает дозвон через Asterisk AMI и как замыкается цикл от генерации речи до фиксации результата.


Архитектура: конвейер из очередей

Система построена как конвейер с явными стадиями — генерация аудио, звонок, сохранение, распознавание ответа. Каждый клиент проходит их последовательно, но стадии работают параллельно друг другу: пока один звонок идёт, для других уже готовится аудио и пишутся результаты.

Для каждой стадии — своя очередь и пул воркеров:

self.queues  = { "generate_voice_message": DelayedQueue(), "create_call": DelayedQueue(), ... }
self.workers = { "generate_voice_message": 5, "create_call": 1, "recognition": 3, "store_data": 10 }
self.stages  = {
    CallStatus.CREATED:   'generate_voice_message',
    CallStatus.GENERATED: 'create_call',
    CallStatus.PENDING:   'recognition'
}

Переход между стадиями управляется через CallStatus: статус звонка определяет, в какую очередь он попадёт следующей. DelayedQueue позволяет откладывать повторные попытки без отдельного планировщика — задержка передаётся прямо в момент постановки задачи.


Подключение к Asterisk AMI

Сервис поднимает panoramisk.Manager и подписывается на события канала — Hangup и BridgeEnter. Обработка обзвона не стартует, пока AMI недоступен: перед запуском воркеров идёт цикл с проверкой через Ping.


От текста к звонку

Первый этап — генерация аудио. Текст сообщения (индивидуальный или шаблонный из БД) отправляется в Yandex Cloud SpeechKit, на выходе — файл с именем uuid4().hex, который сохраняется на сервере в директории, доступной Asterisk. Имя файла дальше путешествует вместе с объектом звонка и в нужный момент передаётся как переменная dialplan — Asterisk сам найдёт и проиграет его абоненту.

Когда аудио готово, воркер create_call формирует AMI Originate:

action = {
    'Action':   'Originate',
    'Channel':  f'PJSIP/11{cl.client.phone}@{trunk}',
    'Context':  'applications',
    'Exten':    2200101,
    'Priority': 1,
    'Async':    'true',
    'Variable': f'file_name={cl.generated},file_record={cl.recorded},lang={lang_value}'
}

Через Variable в dialplan передаются три вещи: какой файл проигрывать, куда писать запись разговора и какую языковую ветку сценария использовать. Async: true означает, что AMI не блокируется в ожидании ответа абонента — команда уходит и сразу возвращает ответ.


Ожидание завершения и повторные попытки

После успешного Originate воркер сохраняет идентификатор канала и встаёт в ожидание: он крутится в цикле с asyncio.sleep(1), пока обработчик события Hangup не сбросит этот идентификатор в пустую строку. Это простой, но рабочий способ синхронизировать асинхронный AMI-поток с логикой воркера.

При Failure в ответе — звонок снова кладётся в ту же очередь через DelayedQueue с задержкой из конфига. Лимит — 6 попыток, после чего статус фиксируется как FAILED.


Сохранение, распознавание и финал

После каждого изменения статуса store_data делает upsert в БД и по значению статуса решает, что дальше: финальный статус (SUCCESSFUL / FAILED) — обновить строку в Google Sheet, любой другой — положить в следующую очередь по таблице self.stages.

Последний этап — recognition — читает WAV-файл записи и отправляет его в SpeechKit STT. Если файл не найден или слишком мал, сразу выставляется FAILED. Иначе — по результату распознавания фиксируется финальный статус, и цикл для этого клиента закрывается.


Когда это применимо

Описанный подход хорошо ложится на любую задачу массовых исходящих уведомлений: напоминания о записи, подтверждения заказов, обзвон должников, голосовые рассылки по сегменту. Ключевое условие — наличие Asterisk с настроенным AMI и SIP-транком. Всё остальное — Python, очереди и облачный TTS/STT — заменяется под конкретный стек.

Подход на основе state machine и очередей с задержкой показывает себя надёжно там, где важна устойчивость к сбоям: упал TTS — задача подождёт и повторится, не ответил абонент — звонок уйдёт на повтор через нужный интервал, не дожидаясь ручного вмешательства. Вся логика переходов, повторов и параллелизма сосредоточена в трёх структурах — self.stages, self.workers и DelayedQueue — и легко читается как единая схема системы.

// Python Dev

Другие статьи Python Dev

Все статьи

// Python Projects

Проекты Python Dev

Все проекты

// Contact

Нужна помощь?

Свяжись со мной и я помогу решить проблему