# -*- coding: utf-8 -*-
# import apscheduler
import pandas as pd
from datetime import datetime
# 导入调度器,此处使用BackgroundScheduler阻塞调度器
from apscheduler.schedulers.background import BackgroundScheduler
# 导入触发器,此处使用IntervalTrigger特定时间间隔触发
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.executors.pool import ThreadPoolExecutor,ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.util import undefined
import os,time,sys,random,copy,json
from QhTestJob import Ui_Form
from PyQt5.QtWidgets import QApplication
from PyQt5 import QtWidgets
from PyQt5.QtCore import Qt
# cron定时调度(某一定时时刻执行)
# (int|str) 表示参数既可以是int类型,也可以是str类型
# (datetime | str) 表示参数既可以是datetime类型,也可以是str类型
# year (int|str) – 4-digit year -(表示四位数的年份,如2008年)
# month (int|str) – month (1-12) -(表示取值范围为1-12月)
# day (int|str) – day of the (1-31) -(表示取值范围为1-31日)
# week (int|str) – ISO week (1-53) -(格里历2006年12月31日可以写成2006年-W52-7(扩展形式)或2006W527(紧凑形式))
# day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) - (表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示)
# hour (int|str) – hour (0-23) - (表示取值范围为0-23时)
# minute (int|str) – minute (0-59) - (表示取值范围为0-59分)
# second (int|str) – second (0-59) - (表示取值范围为0-59秒)
# start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) - (表示开始时间)
# end_date (datetime|str) – latest possible date/time to trigger on (inclusive) - (表示结束时间)
# timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) -(表示时区取值)
# interval 间隔调度(每隔多久执行)
# weeks (int) – number of weeks to wait
# days (int) – number of days to wait
# hours (int) – number of hours to wait
# minutes (int) – number of minutes to wait
# seconds (int) – number of seconds to wait
# start_date (datetime|str) – starting point for the interval calculation
# end_date (datetime|str) – latest possible date/time to trigger on
# timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations
# date 定时调度(作业只会执行一次)
# run_date (datetime|str) – the date/time to run the job at -(任务开始的时间)
# timezone (datetime.tzinfo|str) – time zone for run_date if it doesn't have one already
def QhRR(QhVALUE,QhVALUE1):
print("{}--{}执行定时任务...".format(QhVALUE,QhVALUE1))
class QhApscheduler():
QhFiled = {
"QhJabId": "None", # 任务ID
"QhJabName": "None", # 任务名称
"QhJabFuncName": "None", # 任务名称 执行的程序名
"QhTimesType": "None", # 任务类型 重复 间隔 一次
"QhYear": "None", # 年
"QhMonth": "None", # 月
"QhDay": "None", # 日
"QhWeek": "None", # 周
"QhDayOfWeek": "None", # 星期几
"QhHour": "None", # 小时
"QhMinute": "None", # 分钟
"QhSecond": "None", # 秒钟
"QhJabArgs": "None", # 任务参数 函数参数
"QhJabKwargs": "None", # 任务参数 函数参数
"QhJabStartDate": "None", # 开始时间
"QhJabNextRunTime": "None", # 下次运行时间
"QhJabLastRunTime": "None", # 上次运行时间
"QhJabStatus": "None" # 任务状态
}
def __init__(self,
*args, **kwargs):
self.QhCheduPath = os.path.dirname(os.path.abspath(__file__)) #绝对路径 用于获取定时模块的根目录
print(self.QhCheduPath)
self.QhCheduPoolJobDf = self._QhinitJobPoolCsv()
QhExecutors = {
'default':ThreadPoolExecutor(20),
'processpool':ProcessPoolExecutor(10)
}
self.QhJobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///QhJobPoolDb/QhJabSqlite.sqlite')
}
self.Qhscheduler = BackgroundScheduler(jobstores=self.QhJobstores,executors=QhExecutors,misfire_grace_time=3,coalescing=True)
self.Qhscheduler.start()
self._QhInitAddJobFor()
def _QhInitAddJobFor(self,QhIsFor = True):
if self.QhCheduPoolJobDf.shape[0] == 0: return
for QhInx,QhRow in self.QhCheduPoolJobDf.iterrows():
QhJabId = QhRow["QhJabId"]
QhJabName = QhRow["QhJabName"]
QhJabFuncName = QhRow["QhJabFuncName"]
QhTimesType = QhRow["QhTimesType"]
QhYear = self.QhGeShiZH(QhRow["QhYear"])
QhMonth = self.QhGeShiZH(QhRow["QhMonth"])
QhDay = self.QhGeShiZH(QhRow["QhDay"])
QhWeek = self.QhGeShiZH(QhRow["QhWeek"])
QhDayOfWeek = self.QhGeShiZH(QhRow["QhDayOfWeek"])
QhHour = self.QhGeShiZH(QhRow["QhHour"])
QhMinute = self.QhGeShiZH(QhRow["QhMinute"])
QhSecond = self.QhGeShiZH(QhRow["QhSecond"])
QhJabArgs = self.QhGeShiZH(QhRow["QhJabArgs"])
QhJabKwargs = self.QhGeShiZH(QhRow["QhJabKwargs"])
QhJabStartDate = QhRow["QhJabStartDate"]
QhJabNextRunTime = QhRow["QhJabNextRunTime"]
# QhJabLastRunTime = QhRow["QhJabLastRunTime"]
# QhJabEndDate = QhRow["QhJabEndDate"]
QhJabStatus = self.QhGeShiZH(QhRow["QhJabStatus"])
print(QhYear,QhMonth,QhDay,QhWeek,QhDayOfWeek,QhHour,QhMinute,QhSecond)
print(type(QhYear),type(QhMonth),type(QhDay),type(QhWeek),type(QhDayOfWeek),type(QhHour),type(QhMinute),type(QhSecond))
self.QhAddJob(QhJabId,
QhTimesType=QhTimesType,
QhJabFuncName = QhJabFuncName,
QhJabArgs=QhJabArgs,
QhJabKwargs=QhJabKwargs,
QhName = QhJabName,
QhYear = QhYear,
QhMonth = QhMonth,
QhDay = QhDay,
QhWeek = QhWeek,
QhDayOfWeek = QhDayOfWeek,
QhHour = QhHour,
QhMinute = QhMinute,
QhSecond = QhSecond,
QhIsFor = QhIsFor,
QhJabStatus=QhJabStatus)
# QhIsFor当为True时,保存到csv文件,批量增加时的场景
if QhIsFor == True:self.QhCheduPoolJobDf.to_csv(self.QhCheduPoolJobCsv, index=False,encoding='gbk')
def _QhinitJobPoolCsv(self):
# 初始化定时任务数据库文件夹
# 作者:阙辉
QhCheduPathpd = "{}\QhJobPoolDb".format(self.QhCheduPath)
if os.path.exists(QhCheduPathpd):
print("{} is exist!".format(QhCheduPathpd))
else:
os.mkdir(QhCheduPathpd)
print("{} is not exist, create it!".format(QhCheduPathpd))
self.QhCheduPoolJobCsv = "{}\QhJobPoolCsv.csv".format(QhCheduPathpd)
if not os.path.exists(self.QhCheduPoolJobCsv):
QhFiled = list(QhApscheduler.QhFiled.keys())
self.QhCheduPoolJobDf = pd.DataFrame(columns=QhFiled)
self.QhCheduPoolJobDf.to_csv(self.QhCheduPoolJobCsv, index=False)
else:
self.QhCheduPoolJobDf = pd.read_csv(self.QhCheduPoolJobCsv, encoding='gbk')
self.QhCheduPoolJobDf.fillna("None", inplace=True)
return self.QhCheduPoolJobDf
def QhAddJob(self,
QhJabId,
QhTimesType, # 重复 间隔 一次
QhJabFuncName,
QhJabArgs=None,
QhJabKwargs=None,
QhName=None,
QhYear = None,
QhMonth = None,
QhDay = None,
QhWeek = None,
QhDayOfWeek = None,
QhHour = None,
QhMinute = None,
QhSecond = None,
QhStartDate = None,
QhEndDate = None,
QhTimezone=None,
QhJitter=None,
misfire_grace_time=undefined,
next_run_time=undefined,
jobstore='default',
executor='default',
coalesce=undefined,
max_instances=undefined,
replace_existing=False,
QhIsFor = False,
QhJabStatus = None,
QhJobIsOpen = False,
):
if self.Qhscheduler.get_job(QhJabId):
print("{} is exist!".format(QhJabId))
return # 如果任务已经存在,则不添加
if QhJabStatus == "已关闭" and (not QhJobIsOpen):
print("{} 任务已关闭,不用新建!".format(QhJabId))
return
QhTiggers = self.QhTiggers(QhTimesType =QhTimesType,
QhYear = QhYear,
QhMonth = QhMonth,
QhDay = QhDay,
QhWeek = QhWeek,
QhDayOfWeek = QhDayOfWeek,
QhHour = QhHour,
QhMinute = QhMinute,
QhSecond = QhSecond,
QhStartDate = QhStartDate,
QhEndDate = QhEndDate,
QhTimezone=QhTimezone,
QhJitter=QhJitter)
if QhJabArgs!=None:
QhJabArgs = tuple(QhJabArgs.split("+"))
if QhJabKwargs != None:
QhJabKwargs = QhJabKwargs.replace("+",',')
QhJabKwargs = QhJabKwargs.replace("'",'"')
QhJabKwargs = json.loads(QhJabKwargs)
self.Qhscheduler.add_job(func=globals()[QhJabFuncName],
trigger=QhTiggers,
args=QhJabArgs,
kwargs=QhJabKwargs,
id=QhJabId,
name=QhName,
misfire_grace_time=misfire_grace_time,
next_run_time=next_run_time,
jobstore=jobstore,
executor=executor,
coalesce=coalesce,
max_instances=max_instances,
replace_existing=replace_existing,)
# 函数参数还原输入格式处理 阙辉
# print(QhJabArgs,type(QhJabArgs))
if isinstance(QhJabArgs, tuple):
QhJabArgs = str(QhJabArgs).replace("'",'').replace("(",'').\
replace(")",'').replace(",","+").replace("+ ","+").replace("+ ","+")
# print(QhJabArgs,type(QhJabArgs))
# print(QhJabKwargs,type(QhJabKwargs))
if isinstance(QhJabKwargs, dict):
QhJabKwargs = str(QhJabKwargs).replace("'",'"').replace(",","+")
# print(QhJabKwargs,type(QhJabKwargs))
QhAddJoblDic = copy.deepcopy(QhApscheduler.QhFiled)
QhAddJoblDic["QhJabId"] = QhJabId
QhAddJoblDic["QhJabName"] = QhName
QhAddJoblDic["QhJabFuncName"] = "None" if QhJabFuncName==None else QhJabFuncName
QhAddJoblDic["QhTimesType"] = QhTimesType
QhAddJoblDic["QhYear"] = "None" if QhYear==None else QhYear
QhAddJoblDic["QhMonth"] = "None" if QhMonth==None else QhMonth
QhAddJoblDic["QhDay"] = "None" if QhDay==None else QhDay
QhAddJoblDic["QhWeek"] = "None" if QhWeek==None else QhWeek
QhAddJoblDic["QhDayOfWeek"] = "None" if QhDayOfWeek==None else QhDayOfWeek
QhAddJoblDic["QhHour"] = "None" if QhHour==None else QhHour
QhAddJoblDic["QhMinute"] = "None" if QhMinute==None else QhMinute
QhAddJoblDic["QhSecond"] = "None" if QhSecond==None else QhSecond
QhAddJoblDic["QhJabArgs"] = "None" if QhJabArgs==None else QhJabArgs
QhAddJoblDic["QhJabKwargs"] = "None" if QhJabKwargs==None else QhJabKwargs
# QhAddJoblDic["QhJabStartDate"] = "None"
# QhAddJoblDic["QhJabNextRunTime"] = "None"
# QhAddJoblDic["QhJabLastRunTime"] = "None"
QhAddJoblDic["QhJabStatus"] = "已运行"
try: # 任务id存在,则更新任务,不存在则新增
self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf["QhJabId"]==QhJabId].index[0]
for QhKey,QhValue in QhAddJoblDic.items():
if QhKey == "QhJabId":continue
self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf["QhJabId"]==QhJabId,QhKey] = QhValue
except:
QhCheduPoolJobDfRow = pd.DataFrame([QhAddJoblDic])
print(QhCheduPoolJobDfRow)
try:
self.QhCheduPoolJobDf = self.QhCheduPoolJobDf._append(QhCheduPoolJobDfRow)
except:
self.QhCheduPoolJobDf = self.QhCheduPoolJobDf.append(QhCheduPoolJobDfRow)
print(self.QhCheduPoolJobDf)
# QhIsFor当为False时,保存到csv文件,单个增加时的场景
if (not QhIsFor):self.QhCheduPoolJobDf.to_csv(self.QhCheduPoolJobCsv, index=False,encoding='gbk')
def QhTiggers(self,QhTimesType,
QhYear = None,
QhMonth = None,
QhDay = None,
QhWeek = None,
QhDayOfWeek = None,
QhHour = None,
QhMinute = None,
QhSecond = None,
QhStartDate = None,
QhEndDate = None,
QhTimezone=None,
QhJitter=None
):
if QhTimesType == "重复":
QhTiggers = CronTrigger(year=QhYear,
month=QhMonth,
day=QhDay,
week=QhWeek,
day_of_week=QhDayOfWeek,
hour=QhHour,
minute=QhMinute,
second=QhSecond,
start_date=QhStartDate,
end_date=QhEndDate,
timezone=QhTimezone,
jitter=QhJitter)
elif QhTimesType == "间隔":
QhTiggers = IntervalTrigger(weeks= int(0) if QhWeek == None else QhWeek,
days= 0 if QhDay == None else QhDay,
hours= 0 if QhHour == None else QhHour,
minutes= 0 if QhMinute == None else QhMinute,
seconds= 0 if QhSecond == None else QhSecond,
start_date=QhStartDate,
end_date=QhEndDate,
timezone=QhTimezone,
jitter=QhJitter)
elif QhTimesType == "一次":
QhRunDate = datetime(0 if QhYear == None else QhYear,
0 if QhMonth == None else QhMonth,
0 if QhDay == None else QhDay,
0 if QhHour == None else QhHour,
0 if QhMinute == None else QhMinute,
0 if QhSecond == None else QhSecond)
QhTiggers = DateTrigger(run_date=QhRunDate,
timezone=QhTimezone)
return QhTiggers
def QhPauseJob(self,QhJabId,QhIsFor=False):
# 暂停任务
# QhIsFor当为False时,保存到csv文件,单个增加时的场景
# 阙辉
try:
if QhJabId == "":
print("任务ID不能空,请输入任务ID")
return
if self.Qhscheduler.get_job(QhJabId):
self.Qhscheduler.pause_job(QhJabId)
print("暂停任务",QhJabId)
self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabStatus'] = '已暂停'
if (not QhIsFor):self.QhCheduPoolJobDf.to_csv(self.QhCheduPoolJobCsv, index=False, encoding='gbk')
# AA = self.Qhscheduler.get_job(QhJabId)
# print(AA.state)
else:
print("暂停失败,请检查任务ID,任务可能不存在")
except:
print("暂停失败,请检查任务ID,任务可能不存在")
def QhPauseJobFor(self,QhIsFor=True):
# 暂停所有任务
for QhInx,QhRow in self.QhCheduPoolJobDf.iterrows():
self.QhPauseJob(QhRow['QhJabId'],QhIsFor)
def QhGetJobsState(self):
for job in self.Qhscheduler.get_jobs():
print(f"Job ID: {job.id}, 任务的下一次运行时间: {job.next_run_time}")
print(f"Job ID: {job.id}, 任务是否有待执行: {job.pending}")
# print(f"Job ID: {job.id}, 任务是否有待执行: {job.job_state}")
# print(f"Job ID: {job.id}, 任务是否正在运行: {job.running}")
def QhResumeJob(self,QhJabId,QhIsFor=False):
# 恢复任务
# QhIsFor当为False时,保存到csv文件,单个增加时的场景
# 阙辉
try:
if QhJabId == "":
print("任务ID不能空,请输入任务ID")
return
if self.Qhscheduler.get_job(QhJabId):
self.Qhscheduler.resume_job(QhJabId)
print("恢复任务",QhJabId)
self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabStatus'] = '已运行'
if (not QhIsFor):self.QhCheduPoolJobDf.to_csv(self.QhCheduPoolJobCsv, index=False, encoding='gbk')
else:
print("恢复失败,请检查任务ID,任务可能不存在")
except:
print("恢复失败,请检查任务ID,任务可能不存在")
def QhResumeJobFor(self,QhIsFor=True):
# 恢复所有任务
# QhIsFor当为False时,保存到csv文件,单个增加时的场景
# 阙辉
for QhInx,QhRow in self.QhCheduPoolJobDf.iterrows():
self.QhResumeJob(QhRow['QhJabId'],QhIsFor)
def QhRemoveJob(self,QhJabId,QhIsFor=False):
# 删除任务
# QhIsFor当为False时,保存到csv文件,单个增加时的场景
# 阙辉
try:
if QhJabId == "":
print("任务ID不能空,请输入任务ID")
return
if self.Qhscheduler.get_job(QhJabId):
self.Qhscheduler.remove_job(QhJabId)
print("删除任务",QhJabId)
self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabStatus'] = '已关闭'
if (not QhIsFor):self.QhCheduPoolJobDf.to_csv(self.QhCheduPoolJobCsv, index=False, encoding='gbk')
else:
print("删除失败,请检查任务ID,任务可能不存在")
except:
print("删除失败,请检查任务ID,任务可能不存在")
def QhRemoveJobFor(self,QhIsFor=True):
# 删除所有任务
# QhIsFor当为False时,保存到csv文件,单个增加时的场景
# 阙辉
for QhInx,QhRow in self.QhCheduPoolJobDf.iterrows():
self.QhRemoveJob(QhRow['QhJabId'],QhIsFor)
def QhReopenJob(self,QhJabId,QhIsFor=False):
# 重新打开任务
# QhIsFor当为False时,保存到csv文件,单个增加时的场景
# 阙辉
# try:
if QhJabId == "":
print("任务ID不能空,请输入任务ID")
return
QhJabStatus = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabStatus'].values[0])
if not self.Qhscheduler.get_job(QhJabId):
if QhJabStatus == "已关闭":
# QhJabId = QhRow["QhJabId"]
QhJabName = self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabName'].values[0]
QhJabFuncName = self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabFuncName'].values[0]
QhTimesType = self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhTimesType'].values[0]
QhYear = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhYear'].values[0])
QhMonth = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhMonth'].values[0])
QhDay = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhDay'].values[0])
QhWeek = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhWeek'].values[0])
QhDayOfWeek = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhDayOfWeek'].values[0])
QhHour = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhHour'].values[0])
QhMinute = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhMinute'].values[0])
QhSecond = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhSecond'].values[0])
QhJabArgs = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabArgs'].values[0])
QhJabKwargs = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabKwargs'].values[0])
QhJabStartDate = self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabStartDate'].values[0]
QhJabNextRunTime = self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabNextRunTime'].values[0]
# QhJabLastRunTime = QhRow["QhJabLastRunTime"]
# QhJabEndDate = QhRow["QhJabEndDate"]
# QhJabStatus = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabStatus'].values[0])
print(QhYear,QhMonth,QhDay,QhWeek,QhDayOfWeek,QhHour,QhMinute,QhSecond)
print(type(QhYear),type(QhMonth),type(QhDay),type(QhWeek),type(QhDayOfWeek),type(QhHour),type(QhMinute),type(QhSecond))
self.QhAddJob(QhJabId,
QhTimesType=QhTimesType,
QhJabFuncName = QhJabFuncName,
QhJabArgs=QhJabArgs,
QhJabKwargs=QhJabKwargs,
QhName = QhJabName,
QhYear = QhYear,
QhMonth = QhMonth,
QhDay = QhDay,
QhWeek = QhWeek,
QhDayOfWeek = QhDayOfWeek,
QhHour = QhHour,
QhMinute = QhMinute,
QhSecond = QhSecond,
QhJabStatus=QhJabStatus,
QhIsFor = QhIsFor,
QhJobIsOpen = True) # 状态已在此函数更新
# except:
# print("打开任务失败,请检查任务ID,任务可能不存在")
def QhReopenJobFor(self,QhIsFor=True):
# 重新打开所有任务
# QhIsFor当为False时,保存到csv文件,单个增加时的场景
# 阙辉
for QhInx,QhRow in self.QhCheduPoolJobDf.iterrows():
self.QhReopenJob(QhRow['QhJabId'],QhIsFor)
def QhShouDongRunJob(self,QhJabId):
# 手动运行任务
# 阙辉
try:
if QhJabId == "":
print("任务ID不能空,请输入任务ID")
return
# if self.Qhscheduler.get_job(QhJabId):
QhJabFuncName = self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabFuncName'].values[0]
QhJabArgs = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabArgs'].values[0])
QhJabKwargs = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabKwargs'].values[0])
QhJabFunc = globals().get(QhJabFuncName)
if QhJabFunc:
if QhJabArgs!= None:
QhJabArgs = tuple(QhJabArgs.split("+"))
QhJabFunc(*QhJabArgs)
elif QhJabKwargs != None:
QhJabKwargs = QhJabKwargs.replace("+",',')
QhJabKwargs = QhJabKwargs.replace("'",'"')
QhJabKwargs = json.loads(QhJabKwargs)
QhJabFunc(**QhJabKwargs)
else:
print(f"Function '{QhJabFuncName}' not found.")
# self.Qhscheduler.run_job(QhJabId)
print("手动执行任务",QhJabId)
except Exception as e:
print(f"An error occurred while executing the job: {e}")
print("2手动执行任务失败,请检查任务ID,任务可能不存在")
def QhShouDongRunJobFor(self):
# 手动运行所有任务
# 阙辉
for QhInx,QhRow in self.QhCheduPoolJobDf.iterrows():
self.QhShouDongRunJob(QhRow['QhJabId'])
def QhXiuGaiJob(self,QhJabId,
QhIsFor=True):
# 修改任务方案:先删除,修改后重新添加
pass
# self.QhCheduPoolJobDf = self._QhinitJobPoolCsv() # 重新加载任务池数据
# QhJabName = self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabName'].values[0]
# QhJabFuncName = self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabFuncName'].values[0]
# QhTimesType = self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhTimesType'].values[0]
# QhYear = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhYear'].values[0])
# QhMonth = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhMonth'].values[0])
# QhDay = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhDay'].values[0])
# QhWeek = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhWeek'].values[0])
# QhDayOfWeek = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhDayOfWeek'].values[0])
# QhHour = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhHour'].values[0])
# QhMinute = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhMinute'].values[0])
# QhSecond = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhSecond'].values[0])
# QhJabArgs = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabArgs'].values[0])
# QhJabKwargs = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabKwargs'].values[0])
# QhJabStartDate = self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabStartDate'].values[0]
# QhJabNextRunTime = self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabNextRunTime'].values[0]
# # QhJabLastRunTime = QhRow["QhJabLastRunTime"]
# # QhJabEndDate = QhRow["QhJabEndDate"]
# # QhJabStatus = self.QhGeShiZH(self.QhCheduPoolJobDf.loc[self.QhCheduPoolJobDf['QhJabId'] == QhJabId, 'QhJabStatus'].values[0])
# print(QhYear,QhMonth,QhDay,QhWeek,QhDayOfWeek,QhHour,QhMinute,QhSecond)
# print(type(QhYear),type(QhMonth),type(QhDay),type(QhWeek),type(QhDayOfWeek),type(QhHour),type(QhMinute),type(QhSecond))
# self.Qhscheduler.modify_job()
def QhGeShiZH(self,QhValue):
# 时间格式转化 后期需要优化
# 作者:阙辉
try:
if isinstance(QhValue, str):
if QhValue in ["None","","0"]:
# print("zhuan01")
return None
else:
try:
# print("zhuan02")
return int(QhValue)
except:
# print("zhuan03")
return QhValue
else:
# print("zhuan04")
return int(QhValue)
except:
# print("zhuan05")
return None
class QhTestApschedulerGui(QtWidgets.QWidget,Ui_Form):
def __init__(self, parent=None):
super(QhTestApschedulerGui, self).__init__(parent)
self.setupUi(self)
self.aa=QhApscheduler()
self.pushButton_2.clicked.connect(self.QhPauseJob)
self.pushButton_18.clicked.connect(self.QhResumeJob)
self.pushButton_19.clicked.connect(self.aa.QhGetJobsState)
self.pushButton.clicked.connect(self.QhAddJob)
self.pushButton_3.clicked.connect(self.QhRemoveJob)
self.pushButton_20.clicked.connect(self.aa.QhPauseJobFor)
self.pushButton_21.clicked.connect(self.aa.QhResumeJobFor)
self.pushButton_22.clicked.connect(self.aa.QhRemoveJobFor)
self.pushButton_23.clicked.connect(self.aa.QhReopenJobFor)
self.pushButton_6.clicked.connect(self.QhReopenJob)
self.pushButton_4.clicked.connect(self.QhShouDongRunJob)
self.pushButton_24.clicked.connect(self.aa.QhShouDongRunJobFor)
def QhAddJob(self):
QhJobId = self.lineEdit_5.text()
if QhJobId == "":
print("任务ID不能空,请输入任务ID")
return
QhJobName = "{}-Name".format(QhJobId)
QhScoends = random.randint(8, 20)
QhTimesType = "间隔"
QhJabFuncName = "QhRR"
self.aa.QhAddJob(QhJobId,
QhTimesType=QhTimesType,
QhJabFuncName = QhJabFuncName,
QhName = QhJobName,
QhSecond = QhScoends,
QhJabArgs = "{QhJobId}-测试顺序+QUEHUI".format(QhJobId=QhJobId),
)
print("添加任务{}".format(QhJobId))
def QhPauseJob(self):
QhJobId = self.lineEdit.text()
print(QhJobId)
self.aa.QhPauseJob(QhJobId)
def QhResumeJob(self):
QhJobId = self.lineEdit_18.text()
print(QhJobId)
self.aa.QhResumeJob(QhJobId)
def QhRemoveJob(self):
QhJobId = self.lineEdit_2.text()
print(QhJobId)
self.aa.QhRemoveJob(QhJobId)
def QhReopenJob(self):
QhJobId = self.lineEdit_6.text()
print(QhJobId)
self.aa.QhReopenJob(QhJobId)
def QhShouDongRunJob(self):
QhJobId = self.lineEdit_3.text()
print(QhJobId)
self.aa.QhShouDongRunJob(QhJobId)
if __name__ == '__main__':
# aa=QhApscheduler()
# aa._QhInitAddJobFor()
# # 保持主线程运行
# try:
# while True:
# time.sleep(2)
# except (KeyboardInterrupt, SystemExit):
# aa.Qhscheduler.shutdown()
QApplication.setHighDpiScaleFactorRoundingPolicy(
Qt.HighDpiScaleFactorRoundingPolicy.PassThrough)
QApplication.setAttribute(Qt.AA_EnableHighDpiScaling)
QApplication.setAttribute(Qt.AA_UseHighDpiPixmaps)
qh_app = QApplication(sys.argv) # 创建应用实例
qh_MyWindows = QhTestApschedulerGui() # 创建窗口实例
qh_MyWindows.show() # 显示窗口
qh_n = qh_app.exec() # 执行exec()方法,进入事件循环,如果遇到窗口退出命令,返回整数qh_n
print(qh_n) # 输出输出关闭事件返回的整数
try: # 捕获程序退出事件
sys.exit(qh_n) # 通知python系统,结束程序运行
except SystemExit:
print("请在此做一些其他工作。") # python解释器停止执行前的工作
|