admin管理员组

文章数量:1622542

        

1.订阅智能体介绍

2.基本概念

了解以下知识:

  • 普通智能体的开发:参考前面的教程
  • 网络爬虫
  • 基础的概念:
    • HTML:了解 HTML 的基本结构、标签和常见元素
    • CSS:了解 CSS 在 HTML 中的作用
    • 工具:使用浏览器开发者模式能定位网页元素
  • Python 工具
    • aiohttp:熟悉 aiohttp 库的基本用法,会用aiohttp发起网络请求
    • beautifulsoup:了解HTML解析库 BeautifulSoup,掌握如何使用它从HTML中提取信息
    • MetaGPT 订阅模块
    • Role、Trigger、Callback的概念

其中Trigger是个异步生成器,参考 https://peps.python/pep-0525/,知道如何实现即可

3.智能体构建

以下代码在metagpt安装根目录,jupyterlab notebook中执行.

3.1 GitHub Trending html处理

参考教程, 首先抓取GitHub Trending html中相关的代码,存入本地.

‍然后进行美化瘦身,减少代码量.‍

# 瘦身代码,先用存入本地的html测试
from bs4 import BeautifulSoup

with open("github-trending-raw.html") as f:
    html = f.read()

localhtml = html
soup = BeautifulSoup(html, "html.parser")
for i in soup.find_all(True):
    for name in list(i.attrs):
        if i[name] and name not in ["class"]:
            del i[name]

for i in soup.find_all(["svg", "img", "video", "audio"]):
    i.decompose()

with open("github-trending-slim.html", "w") as f:
    f.write(str(soup))

3.2 GitHub Trending爬虫实现

由于智谱GLM-4的刚发布,正好拿来看看跟GPT-4比较下代码生成的效果.

使用相同的prompt提问:

你是一个精通python的爬虫工程师,需要使用aiohttp爬取https://github/trending,然后用BeautifulSoup解析出榜单中的几个字段:仓库名,仓库对应的url,仓库的描述,star数、fork数、所对应的语言类型、今日star数
目标html代码有如下结构:
‍‍```html
<div data-hpc="">
<article class="Box-row">
...
# 贴入瘦身后的两端html代码
...
</div>
‍‍```

以下是GLM-4的回答:

To scrape the GitHub trending page using aiohttp and BeautifulSoup, you can use the following Python script. This script will extract the required fields for each trending repository:
```python import asyncio import aiohttp from bs4 import BeautifulSoup
async def fetch(session, url): async with session.get(url) as response: return await response.text()
async def main(): url = ' https://github/trending' async with aiohttp.ClientSession() as session: html = await fetch(session, url) soup = BeautifulSoup(html, 'html.parser')
# Find all the articles that contain repository information articles = soup.find_all('article', class_='Box-row') # Iterate over each article and extract the required fields for article in articles: # Extract repository name and URL repo_name = article.find('a', class_='Link').get_text(strip=True) repo_url = f" https://github/{repo_name}" # Extract repository description repo_description = article.find('p', class_='col-9 color-fg-muted my-1 pr-4').get_text(strip=True) # Extract star, fork, language, and today's star count stats = article.find_all('a', class_='Link Link--muted d-inline-block mr-3') star_count = stats[0].get_text(strip=True) fork_count = stats[1].get_text(strip=True) language = article.find('span', class_='repo-language-color').next_sibling.strip() today_star_count = article.find('span', class_='d-inline-block float-sm-right').get_text(strip=True) # Print the extracted information print(f"Repository Name: {repo_name}") print(f"Repository URL: {repo_url}") print(f"Description: {repo_description}") print(f"Stars: {star_count}") print(f"Forks: {fork_count}") print(f"Language: {language}") print(f"Stars Today: {today_star_count}") print("\n") # Add a newline for better readability
if  name == ' main': asyncio.run(main()) ```
Please note the following:
  1. The script uses aiohttp for asynchronous HTTP requests and BeautifulSoupfor parsing the HTML content.
  2. The main coroutine fetches the trending page and then parses it to find the articles containing repository information.
  3. For each article, the script extracts the repository name, URL, description, star count, fork count, language, and today's star count.
  4. The script prints the extracted information. You can modify this part to save the data to a file or database as needed.

Make sure to have aiohttp and BeautifulSoup installed in your Python environment before running this script:
python pip install aiohttp beautifulsoup4
Please also be aware that web scraping can be against the terms of service of some websites. Always check the website's robots.txt file and terms of service before scraping, and respect the website's rules regarding automated access.

以下是GPT-4的回答:

为了正确解析上述HTML结构,并使用 aiohttp​进行异步请求, 首先需要编写异步请求的函数来获取网页内容。然后,我们将使用 BeautifulSoup​来解析HTML并提取所需的字段。下面是执行这些任务的Python代码:
```python import asyncio import aiohttp from bs4 import BeautifulSoup
async def fetch_trending_repos(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: assert response.status == 200 return await response.text()
def parse_trending_repos(html): soup = BeautifulSoup(html, 'html.parser') repos_data = []
# 查找所有仓库的条目 for article in soup.find_all('article', class_='Box-row'): # 解析仓库的名称和URL a_tag = article.h2.a repo_url = a_tag.attrs['href'] repo_name = a_tag.text.strip() # 解析仓库的描述 p_tag = article.find('p', class_='color-fg-muted') repo_description = p_tag.text.strip() if p_tag else None # 解析star数和fork数(注意:它们可能与样式有关,检查是否为最新的前端代码) a_tags = article.find_all('a', class_='Link--muted') star_count = a_tags[0].text.strip().replace(',', '') if len(a_tags) > 0 else '0' fork_count = a_tags[1].text.strip().replace(',', '') if len(a_tags) > 1 else '0' # 解析编程语言 language_tag = article.find('span', itemprop='programmingLanguage') language = language_tag.text.strip() if language_tag else None # 解析今日star数 span_tag = article.find('span', text=lambda x: x and 'stars today' in x) stars_today_count = span_tag.text.strip().split(' ')[0].replace(',', '') if span_tag else '0' # 将所有解析的字段组合成一个字典 repo_data = { 'repo_name': repo_name, 'repo_url': f' https://github{repo_url}', 'description': repo_description, 'stars': star_count, 'forks': fork_count, 'language': language, 'stars_today': stars_today_count } repos_data.append(repo_data) return repos_data
async def main(): url = ' https://github/trending' html_content = await fetch_trending_repos(url) repos_data = parse_trending_repos(html_content) for repo in repos_data: print(repo)
运行异步main()函数
asyncio.run(main()) ```
注意事项:
  • HTML结构随GitHub更新而变化,目标字段的类名(如 .Link--muted、.Box-row等)可能会更改,最好在使用之前检查最新的HTML代码。
  • 在实际使用中可能需要添加错误处理,特别是对于网络请求。
  • 上述代码片段可能需要根据实时HTML结构进行适应性修改。请在实际应用前测试。

展示的代码中首先定义了一个fetch_trending_repos异步函数,用于向指定的URL发起GET请求,并返回响应内容。然后,parse_trending_repos函数接收从GitHub获取的HTML内容,使用BeautifulSoup进行解析并提取所需的字段。最后main函数将上述步骤串联起来,异步运行并打印每个仓库的数据。

在jupyter中,我们需要把asyncio.run(main())​改为await main()​进行测试. 


知乎对markdown的支持简直稀烂,先凑合看,或者直接看我下面结论.

结果显示两者在处理language​字段时都有问题, 其中GLM-4提示: 

---> 35 language = article.find('span', class_='repo-language-color').next_sibling.strip()
AttributeError: 'NoneType' object has no attribute 'next_sibling'

这是由于实际html中存在无该字段的项目,导致缺少对应的<span>​,也是由于我只提供了两个项目代码作为例子,没有体现字段缺失的情况, 好像也不能完全怪他. 这里只需要添加一个if判断即可.

而GPT-4则完全不能识别language​:

---> 32 language_tag = article.find('span', itemprop='programmingLanguage')
AttributeError: 'NoneType' object has no attribute

这是由于经过BeautifulSoup​瘦身后的html已经被去除了programmingLanguage​的标签,无法找到了.但是我给的例子是瘦身后的html, GPT-4怎么就忽略了我给的代码, 直接掌握并使用了github trending 原生html的结构了呢? 这应该算是聪明还是自作聪明? 这似乎能印证一些大众猜测:

  • GPT-4 应该会保存以往问答过的问题案例数据(比如教程中分享的例子,因为用的prompt几乎一样)并通过直接/间接调取来节省算力
  • GPT-4的推理能力可能更多来自经验(以往积累数据)而非逻辑
  • GPT-4存在偷懒行为

相比之下,GLM-4更像个木讷老实人, 但是老板都喜欢这种“员工”对吧[doge]

3.3 构建智能体Action

结合了它两的代码,调整后构建一个智能体的Action:

from metagpt.actions.action import Action
from metagpt.config import CONFIG

import aiohttp
from bs4 import BeautifulSoup
from metagpt.actions.action import Action
from metagpt.config import CONFIG

class CrawlGithubTrending(Action):

    async def run(self, url: str = "https://github/trending"):
        async with aiohttp.ClientSession() as client:
            async with client.get(url, proxy=CONFIG.global_proxy) as response:
                response.raise_for_status()
                html = await response.text()

        soup = BeautifulSoup(html, 'html.parser')

        repos_data = []

        for article in soup.select('article.Box-row'):
            # 解析仓库的名称和URL
            repo_name = article.find('a', class_='Link').get_text(strip=True)
            repo_url = f"https://github/{repo_name}"

            # 解析仓库的描述
            p_tag = article.find('p', class_='col-9 color-fg-muted my-1 pr-4')
            repo_description = p_tag.get_text(strip=True) if p_tag else None

            # 解析star数和fork数(注意:它们可能与样式有关,检查是否为最新的前端代码)
            a_tags = article.find_all('a', class_='Link--muted')
            star_count = a_tags[0].text.strip().replace(',', '') if len(a_tags) > 0 else '0'
            fork_count = a_tags[1].text.strip().replace(',', '') if len(a_tags) > 1 else '0'

            # 解析编程语言
            language_tag_pre = article.find('span', class_='repo-language-color')
            if language_tag_pre:
                language_tag = language_tag_pre.find_next_sibling()
                language = language_tag.get_text(strip=True) if language_tag else None

            # 解析今日star数
            span_tag = article.find('span', text=lambda x: x and 'stars today' in x)
            stars_today_count = span_tag.text.strip().split(' ')[0].replace(',', '') if span_tag else '0'

            # 将所有解析的字段组合成一个字典
            repo_data = {
                'repo_name': repo_name,
                'repo_url': repo_url,
                'description': repo_description,
                'stars': star_count,
                'forks': fork_count,
                'language': language,
                'stars_today': stars_today_count
            }
            repos_data.append(repo_data)   

        return repos_data

测试:

testAct = CrawlGithubTrending()

resp = await testAct.run()

正确输出如下:

2024-01-19 11:42:36.078 | INFO     | metagpt.config:get_default_llm_provider_enum:124 - LLMProviderEnum.OPENAI Model: gpt-4-1106-preview
2024-01-19 11:42:36.079 | INFO     | metagpt.config:get_default_llm_provider_enum:126 - API: LLMProviderEnum.OPENAI
/var/folders/35/bc2v59f502l5ny0rlny0_0900000gn/T/ipykernel_21306/1158858136.py:42: DeprecationWarning: The 'text' argument to find()-type methods is deprecated. Use 'string' instead.
  span_tag = article.find('span', text=lambda x: x and 'stars today' in x)

再打印resp​结果看看:

[{'repo_name': 'xiaolai /everyone-can-use-english',
  'repo_url': 'https://github/xiaolai /everyone-can-use-english',
  'description': '人人都能用英语',
  'stars': '15324',
  'forks': '2691',
  'language': 'Jupyter Notebook',
  'stars_today': '0'},
 {'repo_name': 'TencentARC /PhotoMaker',
...
{'repo_name': 'kedacore /keda',
  'repo_url': 'https://github/kedacore /keda',
  'description': 'KEDA is a Kubernetes-based Event Driven Autoscaling component. It provides event driven scale for any container running in Kubernetes',
  'stars': '7373',
  'forks': '932',
  'language': 'Go',
  'stars_today': '0'}]

3.4 今日榜单总结

有了正确爬下来的内容后就好办了,继续写一个Action对上述内容进行总结. 这里我替换成中文版试一下:

from typing import Any
from metagpt.actions.action import Action

TRENDING_ANALYSIS_PROMPT = """# 任务要求
你是一位github项目趋势分析员. 你被要求基于[github最新趋势文本]中的项目名称、内容、星星数等参数,
汇总生成一篇报告,向用户提供其中的亮点和个性化推荐. 

内容风格请参考以下大纲:
# 今日 GitHub Trending 趋势的标题 (标题应生动并体现今日内容的亮点)
## 今日趋势:揭秘今日GitHub热门项目!探索流行的编程语言,并发现吸引开发者注意的关键领域。从**到**,以前所未有的方式见证顶级项目。
## 趋势类别:深入探索今日GitHub热门领域!在诸如**和**等特定领域中探索特色项目。快速概览每个项目,包括编程语言、星标数等更多信息。
## 列表亮点:聚焦GitHub趋势上的值得关注项目,包括新工具、创新项目以及迅速获得人气的项目,专注于为用户提供独特且引人注目的内容。


报告内容请严格按照以下格式生成:
‍```
# 今日GitHub趋势
## 今日趋势
今日,**和**继续作为最受欢迎的编程语言占据主导地位。关键兴趣领域包括**、**和**。
最受欢迎的项目有Project1和Project2。
## 趋势类别
1. 生成式AI
    - [Project1](https://github/xx/project1):[项目详情,例如星标总数和今日新增,编程语言,...]
    - [Project2](https://github/xx/project2):...
...
## 列表亮点
1. [Project1](https://github/xx/project1):[提供推荐此项目的具体原因]。
2. [Project2](https://github/xx/project1):[提供推荐此项目的具体原因]。
3. ...
‍```

---
[github最新趋势文本]:
{trending}
"""

class AnalysisOSSTrending(Action):

    async def run(
        self,
        trending: Any
    ):
        return await self._aask(TRENDING_ANALYSIS_PROMPT.format(trending=trending))

使用默认的GPT-4总结的测试代码和效果:

testAnalysis = AnalysisOSSTrending()

resp2 = await testAnalysis.run(resp)

GPT-4总结的

指定GLM-4总结的测试代码和效果:

from metagpt.llm import LLM
from metagpt.provider.base_llm import BaseLLM
from metagpt.config import CONFIG, LLMProviderEnum

# 这个针对Action指定大模型的方式用户手册中没写, 我翻源码理解测试了好久. 最后汇总相关代码扔给GPT-4才搞定
testAnalysis.llm = LLM(LLMProviderEnum.ZHIPUAI)

resp3 = await testAnalysis.run(resp)

GLM-4总结的

‍总的来说,GLM-4的总结能力达到了可用的水平,但相比GPT-4还是差点意思. 

GPT-4的标题就很有内味儿! 

而GLM-4生成的文本量少,也不完整,甚至偷懒打省略号…… 并且最后的最后,还要叠buff加一个多余的声明. 服了.

定时触发

没有实际需求,暂略,可以完全参考教程.

Callback设计

通过wxpusher实现推送功能

WXPUSHER_TOKEN即wxpush的APP_TOKEN,参考官方文档获取appToken

WXPUSHER_UIDS可以从应用管理页的”用户管理->用户列表“获取用户的UID,如果要发送给多个用户,可以用逗号将不同用户UID隔开

import os
from typing import Optional
import aiohttp


class WxPusherClient:
    def __init__(self, token: Optional[str] = None, base_url: str = "http://wxpusher.zjiecode"):
        self.base_url = base_url
        self.token = token or os.environ["WXPUSHER_TOKEN"]

    async def send_message(
        self,
        content,
        summary: Optional[str] = None,
        content_type: int = 1,
        topic_ids: Optional[list[int]] = None,
        uids: Optional[list[int]] = None,
        verify: bool = False,
        url: Optional[str] = None,
    ):
        payload = {
            "appToken": self.token,
            "content": content,
            "summary": summary,
            "contentType": content_type,
            "topicIds": topic_ids or [],
            "uids": uids or os.environ["WXPUSHER_UIDS"].split(","),
            "verifyPay": verify,
            "url": url,
        }
        url = f"{self.base_url}/api/send/message"
        return await self._request("POST", url, json=payload)

    async def _request(self, method, url, **kwargs):
        async with aiohttp.ClientSession() as session:
            async with session.request(method, url, **kwargs) as response:
                response.raise_for_status()
                return await response.json()


async def wxpusher_callback(msg: Message):
    client = WxPusherClient()
    await client.send_message(msg.content, content_type=3)

完整代码

import asyncio
import os
from typing import Any, AsyncGenerator, Awaitable, Callable, Dict, Optional

import aiohttp
import discord
from aiocron import crontab
from bs4 import BeautifulSoup
from pydantic import BaseModel, Field
from pytz import BaseTzInfo

from metagpt.actions.action import Action
from metagpt.config import CONFIG
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message

# fix SubscriptionRunner not fully defined
from metagpt.environment import Environment as _  # noqa: F401


# 加载 .env(同级目录创建.env文件,并添加 token和UID) 到环境变量 
from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv())

# 订阅模块,可以from metagpt.subscription import SubscriptionRunner导入,这里贴上代码供参考
class SubscriptionRunner(BaseModel):
    """ A simple wrapper to manage subscription tasks for different roles using asyncio.
        Example:
           # >>> import asyncio
           # >>> from metagpt.subscription import SubscriptionRunner
           # >>> from metagpt.roles import Searcher
           # >>> from metagpt.schema import Message
           # >>> async def trigger():
           # ...     while True:
           # ...         yield Message("the latest news about OpenAI")
           # ...         await asyncio.sleep(3600 * 24)
           # >>> async def callback(msg: Message):
           # ...     print(msg.content)
           # >>> async def main():
           # ...     pb = SubscriptionRunner()
           # ...     await pb.subscribe(Searcher(), trigger(), callback)
           # ...     await pb.run()
           # >>> asyncio.run(main())
   """

    tasks: Dict[Role, asyncio.Task] = Field(default_factory=dict)

    class Config:
        arbitrary_types_allowed = True

    async def subscribe(
        self,
        role: Role,
        trigger: AsyncGenerator[Message, None],
        callback: Callable[
            [
                Message,
            ],
            Awaitable[None],
        ],
    ):
        """Subscribes a role to a trigger and sets up a callback to be called with the role's response.
        Args:
            role: The role to subscribe.
            trigger: An asynchronous generator that yields Messages to be processed by the role.
            callback: An asynchronous function to be called with the response from the role.
        """
        loop = asyncio.get_running_loop()

        async def _start_role():
            async for msg in trigger:
                resp = await role.run(msg)
                await callback(resp)

        self.tasks[role] = loop.create_task(_start_role(), name=f"Subscription-{role}")

    async def unsubscribe(self, role: Role):
        """Unsubscribes a role from its trigger and cancels the associated task.
        Args:
            role: The role to unsubscribe.
        """
        task = self.tasks.pop(role)
        task.cancel()

    async def run(self, raise_exception: bool = True):
        """Runs all subscribed tasks and handles their completion or exception.
        Args:
            raise_exception: _description_. Defaults to True.
        Raises:
            task.exception: _description_
        """
        while True:
            for role, task in self.tasks.items():
                if task.done():
                    if task.exception():
                        if raise_exception:
                            raise task.exception()
                        logger.opt(exception=task.exception()).error(
                            f"Task {task.get_name()} run error"
                        )
                    else:
                        logger.warning(
                            f"Task {task.get_name()} has completed. "
                            "If this is unexpected behavior, please check the trigger function."
                        )
                    self.tasks.pop(role)
                    break
            else:
                await asyncio.sleep(1)


# Actions 的实现
TRENDING_ANALYSIS_PROMPT = """# Requirements
You are a GitHub Trending Analyst, aiming to provide users with insightful and personalized recommendations based on the latest
GitHub Trends. Based on the context, fill in the following missing information, generate engaging and informative titles, 
ensuring users discover repositories aligned with their interests.

# The title about Today's GitHub Trending
## Today's Trends: Uncover the Hottest GitHub Projects Today! Explore the trending programming languages and discover key domains capturing developers' attention. From ** to **, witness the top projects like never before.
## The Trends Categories: Dive into Today's GitHub Trending Domains! Explore featured projects in domains such as ** and **. Get a quick overview of each project, including programming languages, stars, and more.
## Highlights of the List: Spotlight noteworthy projects on GitHub Trending, including new tools, innovative projects, and rapidly gaining popularity, focusing on delivering distinctive and attention-grabbing content for users.
---
# Format Example

```
# [Title]

## Today's Trends
Today, ** and ** continue to dominate as the most popular programming languages. Key areas of interest include **, ** and **.
The top popular projects are Project1 and Project2.

## The Trends Categories
1. Generative AI
    - [Project1](https://github/xx/project1): [detail of the project, such as star total and today, language, ...]
    - [Project2](https://github/xx/project2): ...
...

## Highlights of the List
1. [Project1](https://github/xx/project1): [provide specific reasons why this project is recommended].
...
```

---
# Github Trending
{trending}
"""


class CrawlOSSTrending(Action):
    async def run(self, url: str = "https://github/trending"):
        async with aiohttp.ClientSession() as client:
            async with client.get(url, proxy=CONFIG.global_proxy) as response:
                response.raise_for_status()
                html = await response.text()

        soup = BeautifulSoup(html, "html.parser")

        repositories = []

        for article in soup.select("article.Box-row"):
            repo_info = {}

            repo_info["name"] = (
                article.select_one("h2 a")
                .text.strip()
                .replace("\n", "")
                .replace(" ", "")
            )
            repo_info["url"] = (
                "https://github" + article.select_one("h2 a")["href"].strip()
            )

            # Description
            description_element = article.select_one("p")
            repo_info["description"] = (
                description_element.text.strip() if description_element else None
            )

            # Language
            language_element = article.select_one(
                'span[itemprop="programmingLanguage"]'
            )
            repo_info["language"] = (
                language_element.text.strip() if language_element else None
            )

            # Stars and Forks
            stars_element = article.select("a.Link--muted")[0]
            forks_element = article.select("a.Link--muted")[1]
            repo_info["stars"] = stars_element.text.strip()
            repo_info["forks"] = forks_element.text.strip()

            # Today's Stars
            today_stars_element = article.select_one(
                "span.d-inline-block.float-sm-right"
            )
            repo_info["today_stars"] = (
                today_stars_element.text.strip() if today_stars_element else None
            )

            repositories.append(repo_info)

        return repositories


class AnalysisOSSTrending(Action):
    async def run(self, trending: Any):
        return await self._aask(TRENDING_ANALYSIS_PROMPT.format(trending=trending))


# Role实现
class OssWatcher(Role):
    def __init__(
        self,
        name="Codey",
        profile="OssWatcher",
        goal="Generate an insightful GitHub Trending analysis report.",
        constraints="Only analyze based on the provided GitHub Trending data.",
    ):
        super().__init__(name=name, profile=profile, goal=goal, constraints=constraints)
        self._init_actions([CrawlOSSTrending, AnalysisOSSTrending])
        self._set_react_mode(react_mode="by_order")

    async def _act(self) -> Message:
        logger.info(f"{self._setting}: ready to {self.rc.todo}")
        # By choosing the Action by order under the hood
        # todo will be first SimpleWriteCode() then SimpleRunCode()
        todo = self.rc.todo

        msg = self.get_memories(k=1)[0]  # find the most k recent messages
        result = await todo.run(msg.content)

        msg = Message(content=str(result), role=self.profile, cause_by=type(todo))
        self.rc.memory.add(msg)
        return msg


# Trigger
class GithubTrendingCronTrigger:
    def __init__(
        self,
        spec: str,
        tz: Optional[BaseTzInfo] = None,
        url: str = "https://github/trending",
    ) -> None:
        self.crontab = crontab(spec, tz=tz)
        self.url = url

    def __aiter__(self):
        return self

    async def __anext__(self):
        await self.crontab.next()
        return Message(content=self.url)


# callback
async def discord_callback(msg: Message):
    intents = discord.Intents.default()
    intents.message_content = True
    client = discord.Client(intents=intents, proxy=CONFIG.global_proxy)
    token = os.environ["DISCORD_TOKEN"]
    channel_id = int(os.environ["DISCORD_CHANNEL_ID"])
    async with client:
        await client.login(token)
        channel = await client.fetch_channel(channel_id)
        lines = []
        for i in msg.content.splitlines():
            if i.startswith(("# ", "## ", "### ")):
                if lines:
                    await channel.send("\n".join(lines))
                    lines = []
            lines.append(i)

        if lines:
            await channel.send("\n".join(lines))


class WxPusherClient:
    def __init__(
        self,
        token: Optional[str] = None,
        base_url: str = "https://wxpusher.zjiecode",
    ):
        self.base_url = base_url
        self.token = token or os.environ["WXPUSHER_TOKEN"]

    async def send_message(
        self,
        content,
        summary: Optional[str] = None,
        content_type: int = 1,
        topic_ids: Optional[list[int]] = None,
        uids: Optional[list[int]] = None,
        verify: bool = False,
        url: Optional[str] = None,
    ):
        payload = {
            "appToken": self.token,
            "content": content,
            "summary": summary,
            "contentType": content_type,
            "topicIds": topic_ids or [],
            "uids": uids or os.environ["WXPUSHER_UIDS"].split(","),
            "verifyPay": verify,
            "url": url,
        }
        url = f"{self.base_url}/api/send/message"
        return await self._request("POST", url, json=payload)

    async def _request(self, method, url, **kwargs):
        async with aiohttp.ClientSession() as session:
            async with session.request(method, url, **kwargs) as response:
                response.raise_for_status()
                resp = await response.json()
                print(resp)
                return resp


async def wxpusher_callback(msg: Message):
    client = WxPusherClient()
    await client.send_message(msg.content, content_type=3)


# 运行入口,
async def main(spec: str = "*/2 * * * *", discord: bool = False, wxpusher: bool = True):
    callbacks = []
    if discord:
        callbacks.append(discord_callback)

    if wxpusher:
        callbacks.append(wxpusher_callback)

    if not callbacks:

        async def _print(msg: Message):
            print(msg.content)

        callbacks.append(_print)

    async def callback(msg):
        await asyncio.gather(*(call(msg) for call in callbacks))

    runner = SubscriptionRunner()
    await runner.subscribe(OssWatcher(), GithubTrendingCronTrigger(spec), callback)
    await runner.run()


if __name__ == "__main__":
    import fire
    fire.Fire(main)

本文标签: 智能MetaGPT