using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using BodyshopUploader.Utils; using BodyshopUploader.Models; using BodyshopUploader.Utils.Growls; using GraphQL.Common.Request; using Newtonsoft.Json.Linq; namespace BodyshopUploader.Utils { public static class JobProcessingQueue { private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger(); private static Queue _jobs = new Queue(); private static bool _delegateQueuedOrRunning = false; private static GrowlNotification Growler; public static void SetGrowler(GrowlNotification g) { Growler = g; } public static void Enqueue(DTO_QueueItem item) { lock (_jobs) { //See if the job is already in the queue based on the file name. If it is, ignore it. if (!(_jobs.Any(_ => _.FilePath == item.FilePath))) { logger.Debug("Adding job to the queue. | {0}", item.FilePath); _jobs.Enqueue(item); if (!_delegateQueuedOrRunning) { _delegateQueuedOrRunning = true; ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems, null); } } else { logger.Debug("Ignoring job {0}, it's already in the queue.", item.FilePath); } } } private static async void ProcessQueuedItems(object ignored) { while (true) { DTO_QueueItem item; lock (_jobs) { if (_jobs.Count == 0) { _delegateQueuedOrRunning = false; break; } //Only peek at the first item in the queue so that it does not get added again while it is still getting processed. item = _jobs.Peek(); } try { Thread.Sleep(1000);//Allow a small amount of time to pass before processing the queue item so that any writes can finish. DecodeQueueItemJob(item); await UpsertQueueItem(item); } catch (Exception ex) { ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems, null); logger.Error(ex, "Error processing job queue item. "); throw; } } } private static void DecodeQueueItemJob(DTO_QueueItem item) { item.Job = Utils.Decoder.EstimateDecoder.CIECAEstimateImport.DecodeEstimate(item.FilePath); } private static async Task UpsertQueueItem(DTO_QueueItem item) { var r = new GraphQLRequest { Query = @" mutation INSERT_JOB($jobInput: [jobs_insert_input!]!) { insert_jobs(objects: $jobInput) { returning { id } } }", Variables = new { jobInput = item.Job } }; var d = await Utils.GraphQL.ExecuteQuery(r); if (d != null) { logger.Trace("Job insert succesful. Show notification"); App.Current.Dispatcher.Invoke(() => { Growler.AddNotification(new Notification() { Title = Properties.Resources.Msg_NewJobUploaded, Subtitle = item.Job?.owner?.data.first_name?.Value + " " + item.Job?.owner?.data.last_name?.Value, Message = item.Job?.vehicle?.data.v_model_yr?.Value + " " + item.Job?.vehicle?.data.v_make_desc?.Value + " " + item.Job?.vehicle?.data.v_model_desc?.Value }); }); } else { logger.Error("Job insert failed. Show notification"); //Succesful upsert App.Current.Dispatcher.Invoke(() => { Growler.AddNotification(new Notification() { Title = Properties.Resources.Msg_NewJobUploadError, Subtitle = item.Job.owner?.first_name?.Value + " " + item.Job.owner?.last_name?.Value, Message = item.Job.vehicle?.v_model_yr?.Value + " " + item.Job.vehicle?.v_make_desc?.Value + " " + item.Job.vehicle?.v_model_desc?.Value }); }); } _jobs.Dequeue(); } } }