219 lines
8.0 KiB
C#
219 lines
8.0 KiB
C#
|
|
using System;
|
|||
|
|
using System.Collections.Concurrent;
|
|||
|
|
using System.Collections.Generic;
|
|||
|
|
using System.Linq;
|
|||
|
|
using System.Threading;
|
|||
|
|
using MegaRobo.Logger;
|
|||
|
|
|
|||
|
|
namespace MegaRobo.RRQuartz;
|
|||
|
|
|
|||
|
|
public class JobStore
|
|||
|
|
{
|
|||
|
|
private Thread _threadLoop;
|
|||
|
|
private readonly BlockingCollection<JobItemBase> _jobQueues;
|
|||
|
|
public bool IsStop { get; private set; } = true;
|
|||
|
|
public ILogger Logger { get; set; }
|
|||
|
|
|
|||
|
|
public TimeSpan LoopInterval { get; set; } = TimeSpan.FromMilliseconds(200);
|
|||
|
|
|
|||
|
|
public IList<JobItemBase> JobPool { get; private set; } = new List<JobItemBase>();
|
|||
|
|
|
|||
|
|
public JobItemBase CurrentJob { get; private set; }
|
|||
|
|
|
|||
|
|
public event EventHandler<JobItemBase> TryCountMoreEvent;
|
|||
|
|
public event EventHandler<JobItemBase> JobItemChangedEvent;
|
|||
|
|
// public event EventHandler<JobItemBase> JobItemStatusChangedEvent;
|
|||
|
|
|
|||
|
|
public Action<JobItemBase> CheckCompletedAction { get; set; }
|
|||
|
|
|
|||
|
|
public IList<JobItemBase> JobQueues => _jobQueues.ToList();
|
|||
|
|
public JobStore() : this(50)
|
|||
|
|
{
|
|||
|
|
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
public JobStore(int maxQueueCount)
|
|||
|
|
{
|
|||
|
|
this._jobQueues = new BlockingCollection<JobItemBase>(maxQueueCount);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
public void Start()
|
|||
|
|
{
|
|||
|
|
this.IsStop = false;
|
|||
|
|
if (this._threadLoop is not null) return;
|
|||
|
|
this._threadLoop = new Thread(this.ProcessQueue) { IsBackground = true, Name = nameof(JobStore) + "_" + Guid.NewGuid().ToString("N") };
|
|||
|
|
this._threadLoop?.Start();
|
|||
|
|
this.CurrentJob?.UsedTime.Start();
|
|||
|
|
this.Logger?.LogInformation("已启动..");
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
public void Stop()
|
|||
|
|
{
|
|||
|
|
this.IsStop = true;
|
|||
|
|
this.CurrentJob?.Stop();
|
|||
|
|
this.Logger?.LogInformation("已暂停.");
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
public void Add(JobItemBase jobItem)
|
|||
|
|
{
|
|||
|
|
this.Logger.LogDebug($"新增JobItem:{jobItem.Title}");
|
|||
|
|
this.JobPool.Add(jobItem);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
public void Executes(params string[] jobItemNames)
|
|||
|
|
{
|
|||
|
|
jobItemNames = jobItemNames.Where(x => !string.IsNullOrWhiteSpace(x)).Select(x => x.ToLower()).ToArray();
|
|||
|
|
if (jobItemNames.Length == 0)
|
|||
|
|
return;
|
|||
|
|
// var jobItems = this.JobPool.Where(x => jobItemNames.Contains(x.Name.ToLower())).ToList();
|
|||
|
|
foreach (var itemName in jobItemNames)
|
|||
|
|
{
|
|||
|
|
var item = this.JobPool.FirstOrDefault(x => x.Name == itemName);
|
|||
|
|
if (item is null)
|
|||
|
|
continue;
|
|||
|
|
JobItemBase jobItemBase = item switch
|
|||
|
|
{
|
|||
|
|
JobItem jobItem => new JobItem(jobItem),
|
|||
|
|
JobItemAsync jobItemAsync => new JobItemAsync(jobItemAsync),
|
|||
|
|
_ => null
|
|||
|
|
};
|
|||
|
|
if (jobItemBase is null)
|
|||
|
|
return;
|
|||
|
|
this.Logger.LogDebug($"新增JobItem:{jobItemBase.Title} Guid:{jobItemBase.Guid}");
|
|||
|
|
this._jobQueues.TryAdd(jobItemBase);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
public string GetRunWarnText()
|
|||
|
|
{
|
|||
|
|
if (this.IsStop)
|
|||
|
|
return "暂停中..";
|
|||
|
|
if (this._threadLoop == null)
|
|||
|
|
return "未启动";
|
|||
|
|
if (this.CurrentJob == null && this._jobQueues.Count == 0)
|
|||
|
|
return "等待新任务";
|
|||
|
|
if (this.CurrentJob?.ExecCount > this.CurrentJob?.TryCount)
|
|||
|
|
return "执行超时";
|
|||
|
|
return null;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
private void ProcessQueue()
|
|||
|
|
{
|
|||
|
|
while (!this.IsStop)
|
|||
|
|
{
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
if (this.CurrentJob is null)
|
|||
|
|
{
|
|||
|
|
if (!this._jobQueues.IsCompleted)
|
|||
|
|
{
|
|||
|
|
this.CurrentJob = this._jobQueues.Take();
|
|||
|
|
// if(this._jobQueues.TryTake(out JobItemBase jobItem))
|
|||
|
|
// {
|
|||
|
|
// this.CurrentJob = jobItem;
|
|||
|
|
// this.JobItemChangedEvent?.Invoke(this, jobItem);
|
|||
|
|
// }
|
|||
|
|
this.JobItemChangedEvent?.Invoke(this, this.CurrentJob);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
this.ProcessJobItem();
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
this.Logger.LogException(ex);
|
|||
|
|
}
|
|||
|
|
finally
|
|||
|
|
{
|
|||
|
|
Thread.Sleep(this.LoopInterval);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
this._threadLoop = null;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
private async void ProcessJobItem()
|
|||
|
|
{
|
|||
|
|
if (this.CurrentJob is null)
|
|||
|
|
return;
|
|||
|
|
|
|||
|
|
if (this.CurrentJob.TryCount > 0) // 有限的重试次数
|
|||
|
|
{
|
|||
|
|
if (this.CurrentJob.ExecCount >= this.CurrentJob.TryCount) //如果超过重试次数
|
|||
|
|
{
|
|||
|
|
if (this.CurrentJob.ExecCount == this.CurrentJob.TryCount)
|
|||
|
|
{
|
|||
|
|
this.CurrentJob.ExecCount++;
|
|||
|
|
this.TryCountMoreEvent?.Invoke(this, null);
|
|||
|
|
return;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
else if (this.CurrentJob.Status == JobStatus.RunFailed || this.CurrentJob.CheckTimeout()) //执行失败或者超时
|
|||
|
|
{
|
|||
|
|
string errMsg = "";
|
|||
|
|
if (this.CurrentJob.Status == JobStatus.RunFailed)
|
|||
|
|
{
|
|||
|
|
errMsg = "失败";
|
|||
|
|
}
|
|||
|
|
else if (this.CurrentJob.CheckTimeout())
|
|||
|
|
{
|
|||
|
|
var timeout = this.CurrentJob.UsedTime.Elapsed - this.CurrentJob.Timeout;
|
|||
|
|
errMsg = $"超时{this.CurrentJob.UsedTime.Elapsed.TotalSeconds:f1}/{this.CurrentJob.Timeout.TotalSeconds:f1}s";
|
|||
|
|
}
|
|||
|
|
this.Logger?.LogInformation($"执行Job{errMsg},Name:{this.CurrentJob.Title} Guid:{this.CurrentJob.Guid},正在进行第{this.CurrentJob.ExecCount + 1}/{this.CurrentJob.TryCount}重试");
|
|||
|
|
await this.CurrentJob.OnExecuteActionAsync();
|
|||
|
|
if (this.CurrentJob != null)
|
|||
|
|
{
|
|||
|
|
this.Logger?.LogDebug($"已执行Job,Name: {this.CurrentJob.Title} Guid:{this.CurrentJob.Guid}");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
if (this.CurrentJob is null)
|
|||
|
|
return;
|
|||
|
|
switch (this.CurrentJob.Status)
|
|||
|
|
{
|
|||
|
|
//首次执行;或者手动重发
|
|||
|
|
case JobStatus.None:
|
|||
|
|
this.Logger?.LogInformation($"准备执行Job,Name: {this.CurrentJob.Title} Guid:{this.CurrentJob.Guid}");
|
|||
|
|
// if(this.CurrentJob is JobItem jobItem)
|
|||
|
|
// jobItem.OnExecuteAction();
|
|||
|
|
// else if(this.CurrentJob is JobItemAsync jobItemAsync)
|
|||
|
|
// jobItemAsync.OnExecuteAction();
|
|||
|
|
await this.CurrentJob.OnExecuteActionAsync();
|
|||
|
|
this.Logger?.LogDebug($"已执行Job,Name: {this.CurrentJob?.Title} Guid:{this.CurrentJob?.Guid}");
|
|||
|
|
break;
|
|||
|
|
case JobStatus.RunSuccess:
|
|||
|
|
{
|
|||
|
|
if (this.CurrentJob is JobItem jobItem)
|
|||
|
|
{
|
|||
|
|
jobItem.ResultAction?.Invoke(this.CurrentJob);
|
|||
|
|
}
|
|||
|
|
else if (this.CurrentJob is JobItemAsync jobItemAsync)
|
|||
|
|
{
|
|||
|
|
jobItemAsync.ResultAction?.Invoke(this.CurrentJob);
|
|||
|
|
}
|
|||
|
|
else if (string.IsNullOrWhiteSpace(this.CurrentJob.RunSuccessWithJobName))
|
|||
|
|
this.Executes(this.CurrentJob.RunSuccessWithJobName);
|
|||
|
|
this.CurrentJob = null;
|
|||
|
|
break;
|
|||
|
|
}
|
|||
|
|
case JobStatus.HasSend:
|
|||
|
|
case JobStatus.Ack:
|
|||
|
|
this.CheckCompletedAction?.Invoke(this.CurrentJob);
|
|||
|
|
break;
|
|||
|
|
case JobStatus.RunFailed:
|
|||
|
|
{
|
|||
|
|
if (this.CurrentJob is JobItem jobItem)
|
|||
|
|
{
|
|||
|
|
jobItem.ResultAction?.Invoke(this.CurrentJob);
|
|||
|
|
}
|
|||
|
|
else if (this.CurrentJob is JobItemAsync jobItemAsync)
|
|||
|
|
{
|
|||
|
|
jobItemAsync.ResultAction?.Invoke(this.CurrentJob);
|
|||
|
|
}
|
|||
|
|
else
|
|||
|
|
if (!string.IsNullOrWhiteSpace(this.CurrentJob.RunFailedWithJobName))
|
|||
|
|
this.Executes(this.CurrentJob.RunFailedWithJobName);
|
|||
|
|
break;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|