using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using BodyshopPartner.Utils; using BodyshopPartner.Models; using BodyshopPartner.Utils.Growls; using GraphQL; using Newtonsoft.Json.Linq; using ToastNotifications.Messages; namespace BodyshopPartner.Utils { public static class JobProcessingQueue { private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger(); private static Queue _jobs = new Queue(); private static bool _delegateQueuedOrRunning = false; private static GrowlNotification Growler; public static void SetGrowler(GrowlNotification g) { Growler = g; } public static void Enqueue(DTO_QueueItem item) { lock (_jobs) { //See if the job is already in the queue based on the file name. If it is, ignore it. if (!(_jobs.Any(_ => _.FilePath == item.FilePath))) { logger.Debug("Adding job to the queue. | {0}", item.FilePath); _jobs.Enqueue(item); if (!_delegateQueuedOrRunning) { _delegateQueuedOrRunning = true; ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems, null); } } else { logger.Debug("Ignoring job {0}, it's already in the queue.", item.FilePath); } } } private static async void ProcessQueuedItems(object ignored) { while (true) { DTO_QueueItem item; lock (_jobs) { if (_jobs.Count == 0) { _delegateQueuedOrRunning = false; break; } //Only peek at the first item in the queue so that it does not get added again while it is still getting processed. // item = _jobs.Peek(); item = _jobs.Dequeue(); } try { await UploadJob(item); } catch (Exception ex) { ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems, null); logger.Error("Error processing job queue item. " + ex.ToString()); } finally { // _jobs.Dequeue(); } } } public static async Task UploadJob(DTO_QueueItem item) { Thread.Sleep(1000);//Allow a small amount of time to pass before processing the queue item so that any writes can finish. await GetOpCodes(); DecodeQueueItemJob(item); await UpsertQueueItem(item); } public static async Task GetOpCodes() { var r = new GraphQLRequest { Query = @"query MyQuery($key: String!) { masterdata(where: {key: {_eq: $key}}) { value key } }", Variables = new { key = AppMetaData.ShopRegion + "_ciecaopcodes" } }; var data = await Utils.GraphQL.ExecuteQuery(r); Utils.AppMetaData.CiecaOpCodes = JObject.Parse(data.masterdata[0].value.Value); } private static void DecodeQueueItemJob(DTO_QueueItem item) { item.Job = Utils.Decoder.EstimateDecoder.CIECAEstimateImport.DecodeEstimate(item.FilePath); } private static async Task UpsertQueueItem(DTO_QueueItem item) { if (item.Job is Boolean) { //throw an error logger.Error("The currently process job is missing part of the EMS set and could not be processed."); string msg = Properties.Resources.Msg_NewJobUploadError; Notifications.notifier.ShowError(msg); return; } //TODO: This should perform some sort of upsert to update the vehicle with more up to date information. //Strip out what isn't needed by replacing it with nulls to save DB space. //https://stackoverflow.com/questions/29475467/when-merging-objects-using-newtonsoft-json-how-do-you-ignore-empty-string-value // ((JObject)item.Job).RemovePropertiesByValue(v => v.Type == JTokenType.String && string.IsNullOrEmpty((string)v.Value)); //Add to Holding Queue dynamic newJob = new JObject(); newJob.uploaded_by = Auth.authlink.User.Email; newJob.bodyshopid = AppMetaData.ActiveShopId; newJob.cieca_id = item.Job.ciecaid; newJob.est_data = item.Job; newJob.ownr_name = item.Job.ownr_fn?.Value + " " + item.Job.ownr_ln?.Value + " " + item.Job.ownr_co_nm?.Value; newJob.ins_co_nm = item.Job.ins_co_nm?.Value; newJob.vehicle_info = item.Job.vehicle.data.v_model_yr?.Value + " " + item.Job.vehicle.data.v_make_desc?.Value + " " + item.Job.vehicle.data.v_model_desc?.Value; newJob.clm_no = item.Job.clm_no?.Value; newJob.clm_amt = item.Job.clm_total?.Value; var vehuuid = await Utils.Queries.VehicleQueries.GetVehicleUuidByVin(item?.Job?.vehicle?.data?.v_vin?.Value ?? ""); if (!string.IsNullOrEmpty(vehuuid)) { newJob.est_data.vehicle = null; newJob.est_data.vehicleid = vehuuid; } string jobId = await Utils.Queries.JobsQueries.CheckSupplementByClaimNo(item.Job.clm_no?.Value ?? ""); if (!string.IsNullOrEmpty(jobId)) { newJob.issupplement = true; newJob.jobid = jobId; }; var r = new GraphQLRequest { Query = @" mutation INSERT_AVAILABLE_JOB($jobInput: [available_jobs_insert_input!]!) { insert_available_jobs(objects: $jobInput, on_conflict: {constraint: available_jobs_clm_no_bodyshopid_key, update_columns: [clm_amt, cieca_id, est_data, issupplement, ownr_name, source_system, supplement_number, vehicle_info]}) { returning { id } } } ", Variables = new { jobInput = newJob, } }; //cieca_id, est_data, issupplement, jobid, ownr_name, source_system, supplement_number, vehicle_info try { var d = await Utils.GraphQL.ExecuteQuery(r); logger.Trace("Job insert succesful. Show notification"); App.Current.Dispatcher.Invoke(() => { string msg = Properties.Resources.Msg_NewJobUploaded + " " + item.Job?.ownr_fn?.Value + " " + item.Job?.ownr_ln?.Value + " | " + item.Job?.clm_no?.Value; Utils.Notifications.notifier.ShowSuccess(msg); //Growler.AddNotification(new Notification() //{ // Title = Properties.Resources.Msg_NewJobUploaded, // Subtitle = item.Job?.ownr_fn?.Value + " " + item.Job?.ownr_ln?.Value + " | " + item.Job?.clm_no?.Value, // //Message = item.Job?.vehicle?.data?.v_model_yr?.Value + " " + item.Job?.vehicle?.data?.v_make_desc?.Value + " " + item.Job?.vehicle?.data?.v_model_desc?.Value //}); }); } catch (Exception Ex) { logger.Error("Job insert failed. Show notification " + Ex.ToString()); //Succesful upsert App.Current.Dispatcher.Invoke(() => { string msg = Properties.Resources.Msg_NewJobUploadError + " " + item.Job?.ownr_fn?.Value + " " + item.Job?.ownr_ln?.Value + Ex.Message; Notifications.notifier.ShowError(msg); //Growler.AddNotification(new Notification() //{ // Title = Properties.Resources.Msg_NewJobUploadError, // Subtitle = item.Job?.ownr_fn?.Value + " " + item.Job?.ownr_ln?.Value, // Message = Ex.Message //}); ; }); } } } }