DEV Community

Masui Masanori
Masui Masanori

Posted on

[ASP.NET Core][Entity Framework Core] Try concurrency

Specification

  1. Some users has own lists
  2. Every list items have "PrintType"
  3. A service execute them sequentially per "PrintType"
  4. Before starting executions, the data are inserted into Database
  5. If their execution orders are changed, I will update Database
  6. After finishing execution, I also will update Database

Alt Text

Environments

  • ASP.NET Core ver.5.0.202
  • Npgsql.EntityFrameworkCore.PostgreSQL ver.5.0.5.1
  • Microsoft.EntityFrameworkCore ver.5.0.5

From users to controller

For implementation Specification 3., the "conroller" class must be singleton.

Startup.cs

using ExecuteSample.Models;
using ExecuteSample.Prints;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
namespace ExecuteSample
{
    public class Startup
    {
        private readonly IConfiguration config;
        public Startup(IConfiguration config)
        {
            this.config = config;
        }
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddRazorPages();
            services.AddControllers();
            services.AddDbContext<ExecuteSampleContext>(options =>
                options.UseNpgsql(this.config["DbConnection"]));
            services.AddSingleton<IPrintAppController, PrintAppController>();
            services.AddScoped<IExecutionLists, ExecutionLists>();
            services.AddScoped<IPrintStarter, PrintStarter>();
        }
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            app.UseRouting();
            app.UseStaticFiles();
            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

HomeController.cs

using System.Collections.Generic;
using System.Threading.Tasks;
using ExecuteSample.Prints;
using ExecuteSample.Prints.Dto;
using Microsoft.AspNetCore.Mvc;
namespace ExecuteSample.Controllers
{
    public class HomeController: Controller
    {
        private readonly IPrintStarter starter;
        public HomeController(IPrintStarter starter)
        {
            this.starter = starter;
        }
        [Route("")]
        public async Task<Applications.ActionResult> Index()
        {
            return await this.starter.StartAsync(new List<StartExecutionItem>
            {
                new StartExecutionItem(null, "Item1", 0),
                new StartExecutionItem(null, "Item2", 1),
                new StartExecutionItem(null, "Item3", 2),
            });
        }
        [Route("Start2")]
        public async Task<Applications.ActionResult> Index2()
        {
            return await this.starter.StartAsync(new List<StartExecutionItem>
            {
                new StartExecutionItem(null, "Item4", 1),
                new StartExecutionItem(null, "Item5", 2),
                new StartExecutionItem(null, "Item6", 0),
            });
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

StartExecutionItem.cs

namespace ExecuteSample.Prints.Dto
{
    public record StartExecutionItem(string? Key, string FilePath, int PrintType);
}
Enter fullscreen mode Exit fullscreen mode

IPrintStarter.cs

using System.Collections.Generic;
using System.Threading.Tasks;
using ExecuteSample.Applications;
using ExecuteSample.Prints.Dto;
namespace ExecuteSample.Prints
{
    public interface IPrintStarter
    {
        Task<ActionResult> StartAsync(List<StartExecutionItem> items);
    }
}
Enter fullscreen mode Exit fullscreen mode

PrintStarter.cs

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Threading.Tasks;
using ExecuteSample.Applications;
using ExecuteSample.Models;
using ExecuteSample.Prints.Dto;
using ExecuteSample.Prints.Models;
namespace ExecuteSample.Prints
{
    public class PrintStarter: IPrintStarter
    {
        private readonly ExecuteSampleContext context;
        private readonly IExecutionLists executionLists;
        private readonly IPrintAppController appController;
        public PrintStarter(ExecuteSampleContext context,
            IExecutionLists executionLists,
            IPrintAppController appController)
        {
            this.context = context;
            this.executionLists = executionLists;
            this.appController = appController;
        }
        public async Task<ActionResult> StartAsync(List<StartExecutionItem> items)
        {
            // Get key for separating between other users request
            var targetKey = this.appController.GenerateKey();
            // Insert list items into Database
            var createdItems = await this.executionLists.CreateAsync(items);
            // keep their IDs to determine if all of them are executed
            var ids = new BlockingCollection<int>();
            foreach(var id in createdItems.Select(i => i.Id))
            {
                ids.Add(id);
            }
            // After execution, the executer class will fire events
            Action<string, ExecutionItem, ActionResult>? handler = null;
            handler = async (key, item, result) => {
                if(key != targetKey)
                {
                    return;
                }
                await this.executionLists.FinishAsync(item.Id, result);
                var removedIds = new List<int>();
                for(var i = 0; i < ids.Count; i++)
                {
                    if(ids.TryTake(out var id) == false ||
                        id == item.Id)
                    {
                        continue;
                    }
                    removedIds.Add(id);
                }
                foreach(var id in removedIds)
                {
                    ids.Add(id);
                }
                // After finishing all executions, remove the action
                if(ids.Count <= 0)
                {
                    this.appController.OnNext -= handler;
                    ids.Dispose();
                }
            };
            this.appController.OnNext += handler;    
            // Add them into execution list and get execution orders     
            var updatedItems = this.appController.AddItems(targetKey, createdItems);
            // Update Database   
            await this.executionLists.UpdateAsync(updatedItems);
            this.appController.StartIfNotStarted();
            // the request doesn't wait finishing executions
            return ActionResultFactory.GetSucceeded();
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

IPrintAppController.cs

using System;
using System.Collections.Generic;
using ExecuteSample.Applications;
using ExecuteSample.Prints.Models;
namespace ExecuteSample.Prints
{
    public interface IPrintAppController: IDisposable
    {
        Action<string, ExecutionItem, ActionResult>? OnNext { get; set; }
        string GenerateKey();
        List<ExecutionItem> AddItems(string key, List<ExecutionItem> items);
        void StartIfNotStarted();
    }
}
Enter fullscreen mode Exit fullscreen mode

PrintAppController.cs

using System;
using System.Linq;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Reactive.Linq;
using ExecuteSample.Prints.Models;
using ExecuteSample.Applications;
namespace ExecuteSample.Prints
{
    public record ExecutionResult(string Key, ExecutionItem Target, ActionResult Result);
    public class PrintAppController: IPrintAppController
    {
        // For share instances between multithread
        private BlockingCollection<PrintApp> apps;
        private BlockingCollection<string> keys;
        public Action<string, ExecutionItem, ActionResult>? OnNext { get; set; }
        public PrintAppController()
        {
            this.apps = new BlockingCollection<PrintApp>();
            this.keys = new BlockingCollection<string>();
        }
        public string GenerateKey()
        {            
            var index = 0;
            var key = $"{DateTime.Now:yyyyMMddHHmmssfff}";
            while(true)
            {
                if(this.keys.Any(k => k == $"{key}_{index}") == false) 
                {
                    var resultKey = $"{key}_{index}";
                    this.keys.Add(resultKey);
                    return resultKey;
                }
                index += 1;
            }
        }
        public List<ExecutionItem> AddItems(string key, List<ExecutionItem> items)
        {
            // Create execution group per "PrintType"
            var types = items.Select(p => p.PrintType).Distinct().ToArray();
            var results = new List<ExecutionItem>();
            foreach(var t in types)
            {
                var type = t;
                var app = this.apps.FirstOrDefault(a => a.PrintType == type);
                if(app == null)
                {
                    app = new PrintApp();
                    app.Init(type);
                    this.apps.TryAdd(app);
                    // Get finish execution events
                    app.OnNext += (target, result) => {
                        this.OnNext?.Invoke(target.Key, target.Item, result);
                    };
                }
                var updatedItems = app.AddList(key, items.Where(p => p.PrintType == type));
                results.AddRange(updatedItems);
            }
            return results;
        }
        public void StartIfNotStarted()
        {
            foreach(var app in this.apps)
            {
                if(app.Stopped)
                {
                    app.StartAsync();
                }
            }
        }
        public void Dispose()
        {
            // BlockingCollection must be disposed
            this.apps.Dispose();
            this.keys.Dispose();
            this.OnNext = null;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

PrintApp.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using ExecuteSample.Applications;
using ExecuteSample.Prints.Dto;
using ExecuteSample.Prints.Models;
namespace ExecuteSample.Prints
{
    public class PrintApp
    {
        // Because this class is kept by BlockingCollection
        // I don't need to consider behaviours of these member variables under multithreaded
        public int PrintType { get; private set; }
        public bool Stopped { get; private set; }
        public Action<ExecuteTarget, ActionResult>? OnNext { get; set; }
        public Action? OnCompleted { get; set; }
        private List<ExecuteTarget> properties;
        private bool canceled = false;
        public PrintApp()
        {
            this.properties = new List<ExecuteTarget>();
        }
        public void Init(int printType)
        {
            this.PrintType = printType;
            this.Stopped = true;
            this.canceled = false;
        }
        public IEnumerable<ExecutionItem> AddList(string key, IEnumerable<ExecutionItem> newItems)
        {
            var lastOrderNumber = (this.properties.Count <= 0)? 
                0: this.properties.Select(p => p.Item.ExecutionOrder).Max();
            var results = new List<ExecutionItem>();
            foreach(var i in newItems)
            {
                lastOrderNumber += 1;
                var item = ExecutionItem.UpdateExecutionOrder(i, lastOrderNumber);
                results.Add(item);
                this.properties.Add(new ExecuteTarget(key, item));
            }
            return results;
        }
        public async void StartAsync()
        {
            this.Stopped = false;
            await ExecuteAsync();
        }
        private async Task ExecuteAsync()
        {
            while(this.properties.Count > 0)
            {
                await Task.Delay(5000);
                var item = this.properties[0];
                this.properties.Remove(item);
                OnNext?.Invoke(item, ActionResultFactory.GetSucceeded());
                if(this.properties.Count <= 0)
                {
                    this.Stopped = true;
                    OnCompleted?.Invoke();
                    break;
                }
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

ExecuteSampleContext.cs

using ExecuteSample.Prints.Models;
using Microsoft.EntityFrameworkCore;
namespace ExecuteSample.Models
{
    public class ExecuteSampleContext: DbContext
    {
        public ExecuteSampleContext(DbContextOptions<ExecuteSampleContext> options)
            : base(options)
        {
        }
        public DbSet<ExecutionItem> ExecutionItems => Set<ExecutionItem>();
    }
}
Enter fullscreen mode Exit fullscreen mode

IExecutionLists.cs

using System.Collections.Generic;
using System.Threading.Tasks;
using ExecuteSample.Applications;
using ExecuteSample.Prints.Dto;
using ExecuteSample.Prints.Models;
namespace ExecuteSample.Prints
{
    public interface IExecutionLists
    {
        Task<List<ExecutionItem>> CreateAsync(List<StartExecutionItem> items);
        Task UpdateAsync(List<ExecutionItem> items);
        Task FinishAsync(int id, ActionResult result);
    }
}
Enter fullscreen mode Exit fullscreen mode

ExecutionLists.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using ExecuteSample.Applications;
using ExecuteSample.Models;
using ExecuteSample.Prints.Dto;
using ExecuteSample.Prints.Models;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
namespace ExecuteSample.Prints
{
    public class ExecutionLists: IExecutionLists
    {
        private readonly ExecuteSampleContext context;
        private readonly IServiceScopeFactory scopeFactory;
        public ExecutionLists(ExecuteSampleContext context,
            IServiceScopeFactory scopeFactory)
        {
            this.context = context;
            this.scopeFactory = scopeFactory;
        }
        public async Task<List<ExecutionItem>> CreateAsync(List<StartExecutionItem> items)
        {
            using var transaction = await this.context.Database.BeginTransactionAsync();
            var results = items.Select(i => new ExecutionItem
            {
                FilePath = i.FilePath,
                PrintType = i.PrintType,
                FinishedTime = null,
                ErrorMessage = null,
                LastUpdateDate = DateTime.Now,
            })
            .ToList();
            await this.context.ExecutionItems.AddRangeAsync(results);
            await this.context.SaveChangesAsync();
            await transaction.CommitAsync();
            return results;
        }
        public async Task UpdateAsync(List<ExecutionItem> items)
        {
            using var transaction = await this.context.Database.BeginTransactionAsync();
            foreach(var i in items)
            {
                var id = i.Id;
                var target = await context.ExecutionItems.FirstAsync(i => i.Id == id);
                target.Update(i);
            }
            await this.context.SaveChangesAsync();
            await transaction.CommitAsync();
        }
        public async Task FinishAsync(int id, ActionResult result)
        {
            // Don't use the instance what is injected by DI container
            using(var scope = scopeFactory.CreateScope()) 
            {
                var context = scope.ServiceProvider.GetRequiredService<ExecuteSampleContext>();
                using var transaction = await context.Database.BeginTransactionAsync();
                var target = await context.ExecutionItems.FirstAsync(i => i.Id == id);
                target.UpdateExecutionResult(result);
                await context.SaveChangesAsync();
                await transaction.CommitAsync();
            }            
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

DbContext

At the eventhandler in "StartAsync", I must be careful to use DbContext.
Because when the event is fired, the instance what has been injected by DI container has already been disposed.

PrintStarter.cs

...
        public async Task<ActionResult> StartAsync(List<StartExecutionItem> items)
        {
...
            // After execution, the executer class will fire events
            Action<string, ExecutionItem, ActionResult>? handler = null;
            handler = async (key, item, result) => {
                if(key != targetKey)
                {
                    return;
                }
                await this.executionLists.FinishAsync(item.Id, result);
...
Enter fullscreen mode Exit fullscreen mode

ExecutionLists.cs

...
        public async Task FinishAsync(int id, ActionResult result)
        {
            // Don't do this
            using var transaction = await context.Database.BeginTransactionAsync();
            var target = await context.ExecutionItems.FirstAsync(i => i.Id == id);
            target.UpdateExecutionResult(result);
            await context.SaveChangesAsync();
            await transaction.CommitAsync();          
        }
...
Enter fullscreen mode Exit fullscreen mode

So if I run the above code, I will get an exception.

Unhandled exception. Unhandled exception. Unhandled exception. System.ObjectDisposedException: Cannot access a disposed context instance. A common cause of this error is disposing a context instance that was resolved from dependency injection and then later trying to use the same context instance elsewhere in your application. This may occur if you are calling 'Dispose' on the context instance, or wrapping it in a using statement. If you are using dependency injection, you should let the dependency injection container take care of disposing context instances.
Object name: 'ExecuteSampleContext'.
...
Enter fullscreen mode Exit fullscreen mode

Resources

Discussion (0)