C00225155-02/C00225155/MegaRobo.C00225155/MegaRobo.RRQuartz/JobStore.cs

219 lines
8.0 KiB
C#

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using MegaRobo.Logger;
namespace MegaRobo.RRQuartz;
public class JobStore
{
private Thread _threadLoop;
private readonly BlockingCollection<JobItemBase> _jobQueues;
public bool IsStop { get; private set; } = true;
public ILogger Logger { get; set; }
public TimeSpan LoopInterval { get; set; } = TimeSpan.FromMilliseconds(200);
public IList<JobItemBase> JobPool { get; private set; } = new List<JobItemBase>();
public JobItemBase CurrentJob { get; private set; }
public event EventHandler<JobItemBase> TryCountMoreEvent;
public event EventHandler<JobItemBase> JobItemChangedEvent;
// public event EventHandler<JobItemBase> JobItemStatusChangedEvent;
public Action<JobItemBase> CheckCompletedAction { get; set; }
public IList<JobItemBase> JobQueues => _jobQueues.ToList();
public JobStore() : this(50)
{
}
public JobStore(int maxQueueCount)
{
this._jobQueues = new BlockingCollection<JobItemBase>(maxQueueCount);
}
public void Start()
{
this.IsStop = false;
if (this._threadLoop is not null) return;
this._threadLoop = new Thread(this.ProcessQueue) { IsBackground = true, Name = nameof(JobStore) + "_" + Guid.NewGuid().ToString("N") };
this._threadLoop?.Start();
this.CurrentJob?.UsedTime.Start();
this.Logger?.LogInformation("已启动..");
}
public void Stop()
{
this.IsStop = true;
this.CurrentJob?.Stop();
this.Logger?.LogInformation("已暂停.");
}
public void Add(JobItemBase jobItem)
{
this.Logger.LogDebug($"新增JobItem:{jobItem.Title}");
this.JobPool.Add(jobItem);
}
public void Executes(params string[] jobItemNames)
{
jobItemNames = jobItemNames.Where(x => !string.IsNullOrWhiteSpace(x)).Select(x => x.ToLower()).ToArray();
if (jobItemNames.Length == 0)
return;
// var jobItems = this.JobPool.Where(x => jobItemNames.Contains(x.Name.ToLower())).ToList();
foreach (var itemName in jobItemNames)
{
var item = this.JobPool.FirstOrDefault(x => x.Name == itemName);
if (item is null)
continue;
JobItemBase jobItemBase = item switch
{
JobItem jobItem => new JobItem(jobItem),
JobItemAsync jobItemAsync => new JobItemAsync(jobItemAsync),
_ => null
};
if (jobItemBase is null)
return;
this.Logger.LogDebug($"新增JobItem:{jobItemBase.Title} Guid:{jobItemBase.Guid}");
this._jobQueues.TryAdd(jobItemBase);
}
}
public string GetRunWarnText()
{
if (this.IsStop)
return "暂停中..";
if (this._threadLoop == null)
return "未启动";
if (this.CurrentJob == null && this._jobQueues.Count == 0)
return "等待新任务";
if (this.CurrentJob?.ExecCount > this.CurrentJob?.TryCount)
return "执行超时";
return null;
}
private void ProcessQueue()
{
while (!this.IsStop)
{
try
{
if (this.CurrentJob is null)
{
if (!this._jobQueues.IsCompleted)
{
this.CurrentJob = this._jobQueues.Take();
// if(this._jobQueues.TryTake(out JobItemBase jobItem))
// {
// this.CurrentJob = jobItem;
// this.JobItemChangedEvent?.Invoke(this, jobItem);
// }
this.JobItemChangedEvent?.Invoke(this, this.CurrentJob);
}
}
this.ProcessJobItem();
}
catch (Exception ex)
{
this.Logger.LogException(ex);
}
finally
{
Thread.Sleep(this.LoopInterval);
}
}
this._threadLoop = null;
}
private async void ProcessJobItem()
{
if (this.CurrentJob is null)
return;
if (this.CurrentJob.TryCount > 0) // 有限的重试次数
{
if (this.CurrentJob.ExecCount >= this.CurrentJob.TryCount) //如果超过重试次数
{
if (this.CurrentJob.ExecCount == this.CurrentJob.TryCount)
{
this.CurrentJob.ExecCount++;
this.TryCountMoreEvent?.Invoke(this, null);
return;
}
}
else if (this.CurrentJob.Status == JobStatus.RunFailed || this.CurrentJob.CheckTimeout()) //执行失败或者超时
{
string errMsg = "";
if (this.CurrentJob.Status == JobStatus.RunFailed)
{
errMsg = "失败";
}
else if (this.CurrentJob.CheckTimeout())
{
var timeout = this.CurrentJob.UsedTime.Elapsed - this.CurrentJob.Timeout;
errMsg = $"超时{this.CurrentJob.UsedTime.Elapsed.TotalSeconds:f1}/{this.CurrentJob.Timeout.TotalSeconds:f1}s";
}
this.Logger?.LogInformation($"执行Job{errMsg},Name:{this.CurrentJob.Title} Guid:{this.CurrentJob.Guid},正在进行第{this.CurrentJob.ExecCount + 1}/{this.CurrentJob.TryCount}重试");
await this.CurrentJob.OnExecuteActionAsync();
if (this.CurrentJob != null)
{
this.Logger?.LogDebug($"已执行Job,Name: {this.CurrentJob.Title} Guid:{this.CurrentJob.Guid}");
}
}
}
if (this.CurrentJob is null)
return;
switch (this.CurrentJob.Status)
{
//首次执行;或者手动重发
case JobStatus.None:
this.Logger?.LogInformation($"准备执行Job,Name: {this.CurrentJob.Title} Guid:{this.CurrentJob.Guid}");
// if(this.CurrentJob is JobItem jobItem)
// jobItem.OnExecuteAction();
// else if(this.CurrentJob is JobItemAsync jobItemAsync)
// jobItemAsync.OnExecuteAction();
await this.CurrentJob.OnExecuteActionAsync();
this.Logger?.LogDebug($"已执行Job,Name: {this.CurrentJob?.Title} Guid:{this.CurrentJob?.Guid}");
break;
case JobStatus.RunSuccess:
{
if (this.CurrentJob is JobItem jobItem)
{
jobItem.ResultAction?.Invoke(this.CurrentJob);
}
else if (this.CurrentJob is JobItemAsync jobItemAsync)
{
jobItemAsync.ResultAction?.Invoke(this.CurrentJob);
}
else if (string.IsNullOrWhiteSpace(this.CurrentJob.RunSuccessWithJobName))
this.Executes(this.CurrentJob.RunSuccessWithJobName);
this.CurrentJob = null;
break;
}
case JobStatus.HasSend:
case JobStatus.Ack:
this.CheckCompletedAction?.Invoke(this.CurrentJob);
break;
case JobStatus.RunFailed:
{
if (this.CurrentJob is JobItem jobItem)
{
jobItem.ResultAction?.Invoke(this.CurrentJob);
}
else if (this.CurrentJob is JobItemAsync jobItemAsync)
{
jobItemAsync.ResultAction?.Invoke(this.CurrentJob);
}
else
if (!string.IsNullOrWhiteSpace(this.CurrentJob.RunFailedWithJobName))
this.Executes(this.CurrentJob.RunFailedWithJobName);
break;
}
}
}
}