Telegram 机器人的 TON Connect - Python
在本教程中,我们将创建一个示例 Telegram 机器人,该机器人支持使用 Python TON Connect SDK pytonconnect 的 TON Connect 2.0 认证。 我们将分析连接钱包、发送交易、获取有关已连接钱包的数据以及断开钱包的连接。
打开演示机器人
查看 GitHub
准备工作
安装库
要制作机器人,我们将使用 aiogram
3.0 Python 库。
要开始将 TON Connect 集成到您的 Telegram 机器人中,您需要安装 pytonconnect
包。
并且,为了使用 TON 原语并解析用户地址,我们需要 pytoniq-core
。
您可以使用 pip 来完成此操作:
pip install aiogram pytoniq-core python-dotenv
pip install pytonconnect
设置配置
在 .env
文件中指定 机器人令牌 和 TON Connect 清单文件 的链接。之后在 config.py
中加载它们:
# .env
TOKEN='1111111111:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA' # 在此处填写您的机器人令牌
MANIFEST_URL='https://raw.githubusercontent.com/XaBbl4/pytonconnect/main/pytonconnect-manifest.json'
# config.py
from os import environ as env
from dotenv import load_dotenv
load_dotenv()
TOKEN = env['TOKEN']
MANIFEST_URL = env['MANIFEST_URL']
创建简单机器人
创建 main.py
文件,其中将包含主要机器人代码:
# main.py
import sys
import logging
import asyncio
import config
from aiogram import Bot, Dispatcher, F
from aiogram.enums import ParseMode
from aiogram.filters import CommandStart, Command
from aiogram.types import Message, CallbackQuery
logger = logging.getLogger(__file__)
dp = Dispatcher()
bot = Bot(config.TOKEN, parse_mode=ParseMode.HTML)
@dp.message(CommandStart())
async def command_start_handler(message: Message):
await message.answer(text='Hi!')
async def main() -> None:
await bot.delete_webhook(drop_pending_updates=True) # skip_updates = True
await dp.start_polling(bot)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
asyncio.run(main())
钱包连接
TON Connect 存储
让我们为 TON Connect 创建简单的存储
# tc_storage.py
from pytonconnect.storage import IStorage, DefaultStorage
storage = {}
class TcStorage(IStorage):
def __init__(self, chat_id: int):
self.chat_id = chat_id
def _get_key(self, key: str):
return str(self.chat_id) + key
async def set_item(self, key: str, value: str):
storage[self._get_key(key)] = value
async def get_item(self, key: str, default_value: str = None):
return storage.get(self._get_key(key), default_value)
async def remove_item(self, key: str):
storage.pop(self._get_key(key))
连接处理器
首先,我们需要一个为每个用户返回不同实例的函数:
# connector.py
from pytonconnect import TonConnect
import config
from tc_storage import TcStorage
def get_connector(chat_id: int):
return TonConnect(config.MANIFEST_URL, storage=TcStorage(chat_id))
其次,让我们在 command_start_handler()
中添加连接处理器:
# main.py
@dp.message(CommandStart())
async def command_start_handler(message: Message):
chat_id = message.chat.id
connector = get_connector(chat_id)
connected = await connector.restore_connection()
mk_b = InlineKeyboardBuilder()
if connected:
mk_b.button(text='Send Transaction', callback_data='send_tr')
mk_b.button(text='Disconnect', callback_data='disconnect')
await message.answer(text='You are already connected!', reply_markup=mk_b.as_markup())
else:
wallets_list = TonConnect.get_wallets()
for wallet in wallets_list:
mk_b.button(text=wallet['name'], callback_data=f'connect:{wallet["name"]}')
mk_b.adjust(1, )
await message.answer(text='Choose wallet to connect', reply_markup=mk_b.as_markup())
现在,对于尚未连接钱包的用户,机器人会发送带有所有可用钱包按钮的消息。
因此,我们需要编写函数来处理 connect:{wallet["name"]}
回调:
# main.py
async def connect_wallet(message: Message, wallet_name: str):
connector = get_connector(message.chat.id)
wallets_list = connector.get_wallets()
wallet = None
for w in wallets_list:
if w['name'] == wallet_name:
wallet = w
if wallet is None:
raise Exception(f'Unknown wallet: {wallet_name}')
generated_url = await connector.connect(wallet)
mk_b = InlineKeyboardBuilder()
mk_b.button(text='Connect', url=generated_url)
await message.answer(text='Connect wallet within 3 minutes', reply_markup=mk_b.as_markup())
mk_b = InlineKeyboardBuilder()
mk_b.button(text='Start', callback_data='start')
for i in range(1, 180):
await asyncio.sleep(1)
if connector.connected:
if connector.account.address:
wallet_address = connector.account.address
wallet_address = Address(wallet_address).to_str(is_bounceable=False)
await message.answer(f'You are connected with address <code>{wallet_address}</code>', reply_markup=mk_b.as_markup())
logger.info(f'Connected with address: {wallet_address}')
return
await message.answer(f'Timeout error!', reply_markup=mk_b.as_markup())
@dp.callback_query(lambda call: True)
async def main_callback_handler(call: CallbackQuery):
await call.answer()
message = call.message
data = call.data
if data == "start":
await command_start_handler(message)
elif data == "send_tr":
await send_transaction(message)
elif data == 'disconnect':
await disconnect_wallet(message)
else:
data = data.split(':')
if data[0] == 'connect':
await connect_wallet(message, data[1])
机器人给用户 3 分钟时间连接钱包,之后会报告超时错误。
实现交易请求
让我们以 消息构建器 文章之一为例:
# messages.py
from base64 import urlsafe_b64encode
from pytoniq_core import begin_cell
def get_comment_message(destination_address: str, amount: int, comment: str) -> dict:
data = {
'address': destination_address,
'amount': str(amount),
'payload': urlsafe_b64encode(
begin_cell()
.store_uint(0, 32) # op code for comment message
.store_string(comment) # store comment
.end_cell() # end cell
.to_boc() # convert it to boc
)
.decode() # encode it to urlsafe base64
}
return data
并在 main.py
文件中添加 send_transaction()
函数:
# main.py
@dp.message(Command('transaction'))
async def send_transaction(message: Message):
connector = get_connector(message.chat.id)
connected = await connector.restore_connection()
if not connected:
await message.answer('Connect wallet first!')
return
transaction = {
'valid_until': int(time.time() + 3600),
'messages': [
get_comment_message(
destination_address='0:0000000000000000000000000000000000000000000000000000000000000000',
amount=int(0.01 * 10 ** 9),
comment='hello world!'
)
]
}
await message.answer(text='Approve transaction in your wallet app!')
await connector.send_transaction(
transaction=transaction
)
但我们也应该处理可能的错误,所以我们将 send_transaction
方法放入 try - except
语句中:
@dp.message(Command('transaction'))
async def send_transaction(message: Message):
...
await message.answer(text='Approve transaction in your wallet app!')
try:
await asyncio.wait_for(connector.send_transaction(
transaction=transaction
), 300)
except asyncio.TimeoutError:
await message.answer(text='Timeout error!')
except pytonconnect.exceptions.UserRejectsError:
await message.answer(text='You rejected the transaction!')
except Exception as e:
await message.answer(text=f'Unknown error: {e}')
添加断开连接处理器
这个函数的实现非常简单:
async def disconnect_wallet(message: Message):
connector = get_connector(message.chat.id)
await connector.restore_connection()
await connector.disconnect()
await message.answer('You have been successfully disconnected!')
目前,项目结构如下:
.
.env
├── config.py
├── connector.py
├── main.py
├── messages.py
└── tc_storage.py
main.py
的内容如下:
展示 main.py
# main.py
import sys
import logging
import asyncio
import time
import pytonconnect.exceptions
from pytoniq_core import Address
from pytonconnect import TonConnect
import config
from messages import get_comment_message
from connector import get_connector
from aiogram import Bot, Dispatcher, F
from aiogram.enums import ParseMode
from aiogram.filters import CommandStart, Command
from aiogram.types import Message, CallbackQuery
from aiogram.utils.keyboard import InlineKeyboardBuilder
logger = logging.getLogger(__file__)
dp = Dispatcher()
bot = Bot(config.TOKEN, parse_mode=ParseMode.HTML)
@dp.message(CommandStart())
async def command_start_handler(message: Message):
chat_id = message.chat.id
connector = get_connector(chat_id)
connected = await connector.restore_connection()
mk_b = InlineKeyboardBuilder()
if connected:
mk_b.button(text='Send Transaction', callback_data='send_tr')
mk_b.button(text='Disconnect', callback_data='disconnect')
await message.answer(text='You are already connected!', reply_markup=mk_b.as_markup())
else:
wallets_list = TonConnect.get_wallets()
for wallet in wallets_list:
mk_b.button(text=wallet['name'], callback_data=f'connect:{wallet["name"]}')
mk_b.adjust(1, )
await message.answer(text='Choose wallet to connect', reply_markup=mk_b.as_markup())
@dp.message(Command('transaction'))
async def send_transaction(message: Message):
connector = get_connector(message.chat.id)
connected = await connector.restore_connection()
if not connected:
await message.answer('Connect wallet first!')
return
transaction = {
'valid_until': int(time.time() + 3600),
'messages': [
get_comment_message(
destination_address='0:0000000000000000000000000000000000000000000000000000000000000000',
amount=int(0.01 * 10 ** 9),
comment='hello world!'
)
]
}
await message.answer(text='Approve transaction in your wallet app!')
try:
await asyncio.wait_for(connector.send_transaction(
transaction=transaction
), 300)
except asyncio.TimeoutError:
await message.answer(text='Timeout error!')
except pytonconnect.exceptions.UserRejectsError:
await message.answer(text='You rejected the transaction!')
except Exception as e:
await message.answer(text=f'Unknown error: {e}')
async def connect_wallet(message: Message, wallet_name: str):
connector = get_connector(message.chat.id)
wallets_list = connector.get_wallets()
wallet = None
for w in wallets_list:
if w['name'] == wallet_name:
wallet = w
if wallet is None:
raise Exception(f'Unknown wallet: {wallet_name}')
generated_url = await connector.connect(wallet)
mk_b = InlineKeyboardBuilder()
mk_b.button(text='Connect', url=generated_url)
await message.answer(text='Connect wallet within 3 minutes', reply_markup=mk_b.as_markup())
mk_b = InlineKeyboardBuilder()
mk_b.button(text='Start', callback_data='start')
for i in range(1, 180):
await asyncio.sleep(1)
if connector.connected:
if connector.account.address:
wallet_address = connector.account.address
wallet_address = Address(wallet_address).to_str(is_bounceable=False)
await message.answer(f'You are connected with address <code>{wallet_address}</code>', reply_markup=mk_b.as_markup())
logger.info(f'Connected with address: {wallet_address}')
return
await message.answer(f'Timeout error!', reply_markup=mk_b.as_markup())
async def disconnect_wallet(message: Message):
connector = get_connector(message.chat.id)
await connector.restore_connection()
await connector.disconnect()
await message.answer('You have been successfully disconnected!')
@dp.callback_query(lambda call: True)
async def main_callback_handler(call: CallbackQuery):
await call.answer()
message = call.message
data = call.data
if data == "start":
await command_start_handler(message)
elif data == "send_tr":
await send_transaction(message)
elif data == 'disconnect':
await disconnect_wallet(message)
else:
data = data.split(':')
if data[0] == 'connect':
await connect_wallet(message, data[1])
async def main() -> None:
await bot.delete_webhook(drop_pending_updates=True) # skip_updates = True
await dp.start_polling(bot)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
asyncio.run(main())
改进
添加持久存储 - Redis
目前,我们的 TON Connect 存储使用字典,导致机器人重启后会丢失会话。 让我们使用 Redis 添加永久数据库存储:
在启动 Redis 数据库后安装用于与之交互的 python 库:
pip install redis
并在 tc_storage.py
中更新 TcStorage
类:
import redis.asyncio as redis
client = redis.Redis(host='localhost', port=6379)
class TcStorage(IStorage):
def __init__(self, chat_id: int):
self.chat_id = chat_id
def _get_key(self, key: str):
return str(self.chat_id) + key
async def set_item(self, key: str, value: str):
await client.set(name=self._get_key(key), value=value)
async def get_item(self, key: str, default_value: str = None):
value = await client.get(name=self._get_key(key))
return value.decode() if value else default_value
async def remove_item(self, key: str):
await client.delete(self._get_key(key))
添加二维码
安装 python qrcode
包以生成它们:
pip install qrcode
更改 connect_wallet()
函数,使其生成二维码并以图片形式发送给用户:
from io import BytesIO
import qrcode
from aiogram.types import BufferedInputFile
async def connect_wallet(message: Message, wallet_name: str):
...
img = qrcode.make(generated_url)
stream = BytesIO()
img.save(stream)
file = BufferedInputFile(file=stream.getvalue(), filename='qrcode')
await message.answer_photo(photo=file, caption='Connect wallet within 3 minutes', reply_markup=mk_b.as_markup())
...
总结
接下来可以做什么?
- 您可以在机器人中添加更好的错误处理。
- 您可以添加启动文本和类似
/connect_wallet
的命令。