using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using MegaRobo.Logger; namespace MegaRobo.RRQuartz; public class JobStore { private Thread _threadLoop; private readonly BlockingCollection _jobQueues; public bool IsStop { get; private set; } = true; public ILogger Logger { get; set; } public TimeSpan LoopInterval { get; set; } = TimeSpan.FromMilliseconds(200); public IList JobPool { get; private set; } = new List(); public JobItemBase CurrentJob { get; private set; } public event EventHandler TryCountMoreEvent; public event EventHandler JobItemChangedEvent; // public event EventHandler JobItemStatusChangedEvent; public Action CheckCompletedAction { get; set; } public IList JobQueues => _jobQueues.ToList(); public JobStore() : this(50) { } public JobStore(int maxQueueCount) { this._jobQueues = new BlockingCollection(maxQueueCount); } public void Start() { this.IsStop = false; if (this._threadLoop is not null) return; this._threadLoop = new Thread(this.ProcessQueue) { IsBackground = true, Name = nameof(JobStore) + "_" + Guid.NewGuid().ToString("N") }; this._threadLoop?.Start(); this.CurrentJob?.UsedTime.Start(); this.Logger?.LogInformation("已启动.."); } public void Stop() { this.IsStop = true; this.CurrentJob?.Stop(); this.Logger?.LogInformation("已暂停."); } public void Add(JobItemBase jobItem) { this.Logger.LogDebug($"新增JobItem:{jobItem.Title}"); this.JobPool.Add(jobItem); } public void Executes(params string[] jobItemNames) { jobItemNames = jobItemNames.Where(x => !string.IsNullOrWhiteSpace(x)).Select(x => x.ToLower()).ToArray(); if (jobItemNames.Length == 0) return; // var jobItems = this.JobPool.Where(x => jobItemNames.Contains(x.Name.ToLower())).ToList(); foreach (var itemName in jobItemNames) { var item = this.JobPool.FirstOrDefault(x => x.Name == itemName); if (item is null) continue; JobItemBase jobItemBase = item switch { JobItem jobItem => new JobItem(jobItem), JobItemAsync jobItemAsync => new JobItemAsync(jobItemAsync), _ => null }; if (jobItemBase is null) return; this.Logger.LogDebug($"新增JobItem:{jobItemBase.Title} Guid:{jobItemBase.Guid}"); this._jobQueues.TryAdd(jobItemBase); } } public string GetRunWarnText() { if (this.IsStop) return "暂停中.."; if (this._threadLoop == null) return "未启动"; if (this.CurrentJob == null && this._jobQueues.Count == 0) return "等待新任务"; if (this.CurrentJob?.ExecCount > this.CurrentJob?.TryCount) return "执行超时"; return null; } private void ProcessQueue() { while (!this.IsStop) { try { if (this.CurrentJob is null) { if (!this._jobQueues.IsCompleted) { this.CurrentJob = this._jobQueues.Take(); // if(this._jobQueues.TryTake(out JobItemBase jobItem)) // { // this.CurrentJob = jobItem; // this.JobItemChangedEvent?.Invoke(this, jobItem); // } this.JobItemChangedEvent?.Invoke(this, this.CurrentJob); } } this.ProcessJobItem(); } catch (Exception ex) { this.Logger.LogException(ex); } finally { Thread.Sleep(this.LoopInterval); } } this._threadLoop = null; } private async void ProcessJobItem() { if (this.CurrentJob is null) return; if (this.CurrentJob.TryCount > 0) // 有限的重试次数 { if (this.CurrentJob.ExecCount >= this.CurrentJob.TryCount) //如果超过重试次数 { if (this.CurrentJob.ExecCount == this.CurrentJob.TryCount) { this.CurrentJob.ExecCount++; this.TryCountMoreEvent?.Invoke(this, null); return; } } else if (this.CurrentJob.Status == JobStatus.RunFailed || this.CurrentJob.CheckTimeout()) //执行失败或者超时 { string errMsg = ""; if (this.CurrentJob.Status == JobStatus.RunFailed) { errMsg = "失败"; } else if (this.CurrentJob.CheckTimeout()) { var timeout = this.CurrentJob.UsedTime.Elapsed - this.CurrentJob.Timeout; errMsg = $"超时{this.CurrentJob.UsedTime.Elapsed.TotalSeconds:f1}/{this.CurrentJob.Timeout.TotalSeconds:f1}s"; } this.Logger?.LogInformation($"执行Job{errMsg},Name:{this.CurrentJob.Title} Guid:{this.CurrentJob.Guid},正在进行第{this.CurrentJob.ExecCount + 1}/{this.CurrentJob.TryCount}重试"); await this.CurrentJob.OnExecuteActionAsync(); if (this.CurrentJob != null) { this.Logger?.LogDebug($"已执行Job,Name: {this.CurrentJob.Title} Guid:{this.CurrentJob.Guid}"); } } } if (this.CurrentJob is null) return; switch (this.CurrentJob.Status) { //首次执行;或者手动重发 case JobStatus.None: this.Logger?.LogInformation($"准备执行Job,Name: {this.CurrentJob.Title} Guid:{this.CurrentJob.Guid}"); // if(this.CurrentJob is JobItem jobItem) // jobItem.OnExecuteAction(); // else if(this.CurrentJob is JobItemAsync jobItemAsync) // jobItemAsync.OnExecuteAction(); await this.CurrentJob.OnExecuteActionAsync(); this.Logger?.LogDebug($"已执行Job,Name: {this.CurrentJob?.Title} Guid:{this.CurrentJob?.Guid}"); break; case JobStatus.RunSuccess: { if (this.CurrentJob is JobItem jobItem) { jobItem.ResultAction?.Invoke(this.CurrentJob); } else if (this.CurrentJob is JobItemAsync jobItemAsync) { jobItemAsync.ResultAction?.Invoke(this.CurrentJob); } else if (string.IsNullOrWhiteSpace(this.CurrentJob.RunSuccessWithJobName)) this.Executes(this.CurrentJob.RunSuccessWithJobName); this.CurrentJob = null; break; } case JobStatus.HasSend: case JobStatus.Ack: this.CheckCompletedAction?.Invoke(this.CurrentJob); break; case JobStatus.RunFailed: { if (this.CurrentJob is JobItem jobItem) { jobItem.ResultAction?.Invoke(this.CurrentJob); } else if (this.CurrentJob is JobItemAsync jobItemAsync) { jobItemAsync.ResultAction?.Invoke(this.CurrentJob); } else if (!string.IsNullOrWhiteSpace(this.CurrentJob.RunFailedWithJobName)) this.Executes(this.CurrentJob.RunFailedWithJobName); break; } } } }