from kafka import KafkaConsumer, KafkaProducer
import json
import time
from datetime import datetime
import oss2
def process_task(msg):
task_params = json.loads(msg.value)
item_id = task_params['itemId']
param_value = task_params['paramValue']
logger.info(f"开始处理项【{item_id}】对应参数【{param_value}】")
# 构建请求链接
url = f"{config.get('engine', 'site_base_path')}/view?param={param_value}&id={item_id}"
driver.get(url)
try:
# 简单等待页面加载
time.sleep(3) # 根据需要调整或替换为WebDriverWait
# 生成截图文件名
today = datetime.now().strftime('%Y-%m-%d')
screenshot_dir = os.path.join(config.get('engine', 'image_path'), 'images', today)
os.makedirs(screenshot_dir, exist_ok=True)
fname = os.path.join(screenshot_dir, f"{item_id}_{param_value}.png")
driver.save_screenshot(fname)
logger.info(f"保存截图到 {fname}")
# 上传至OSS(省略具体实现,根据实际情况添加)
upload_to_oss(fname)
# 发送完成通知
notify_completion(item_id, param_value, fname)
logger.info(f"完成处理项【{item_id}】对应参数【{param_value}】")
except Exception as e:
logger.error(f"处理项【{item_id}】对应参数【{param_value}】时发生异常: {e}")
def upload_to_oss(file_path):
"""上传文件到阿里云OSS"""
auth = oss2.Auth(config.get('oss', 'access_key_id'), config.get('oss', 'access_key_secret'))
bucket = oss2.Bucket(auth, config.get('oss', 'endpoint'), config.get('oss', 'bucket_name'))
remote_path = os.path.relpath(file_path, config.get('engine', 'image_path'))
bucket.put_object_from_file(remote_path, file_path)
def notify_completion(item_id, param_value, image_path):
"""发送完成通知"""
producer.send(config.get('kafka', 'notify_topic'), {
'itemId': item_id,
'paramValue': param_value,
'imagePath': image_path
})
|