Vì vậy, tôi có quy trình dịch vụ cửa sổ thực hiện quy trình quy trình làm việc. Phần cuối sử dụng Repository và UnitofWork Pattern và Unity trên đầu trang Entity Framework với lớp thực thể được tạo ra từ edmx. Tôi sẽ không đi vào rất nhiều chi tiết như nó không cần thiết nhưng về cơ bản có 5 bước mà quy trình làm việc đi qua. Một quy trình cụ thể có thể ở bất kỳ giai đoạn nào vào bất kỳ thời điểm nào (theo thứ tự). Bước một chỉ tạo dữ liệu cho bước hai, xác nhận dữ liệu thông qua một quá trình chạy dài đến một máy chủ khác. Sau đó, bước có tạo ra một pdf với dữ liệu đó. Đối với mỗi giai đoạn chúng tôi sinh ra một bộ đếm thời gian, tuy nhiên nó được cấu hình để cho phép nhiều hơn một bộ đếm thời gian được sinh ra cho mỗi giai đoạn. Trong đó đặt vấn đề. Khi tôi thêm bộ xử lý vào một giai đoạn cụ thể, lỗi sau đây ngẫu nhiên:Khung thực thể đa luồng: Kết nối không được đóng. Trạng thái hiện tại của kết nối đang kết nối
Kết nối không được đóng. Trạng thái hiện tại của kết nối đang kết nối.
Đọc lên trên điều này có vẻ hiển nhiên là điều này đang xảy ra vì ngữ cảnh đang cố gắng truy cập cùng một thực thể từ hai luồng. Nhưng đây là nơi nó là loại ném cho tôi một vòng lặp. Tất cả thông tin tôi có thể tìm thấy trên trạng thái này là chúng ta nên sử dụng một ngữ cảnh cá thể cho mỗi luồng. Mà theo như tôi có thể nói tôi đang làm (xem mã dưới đây). Tôi không sử dụng mẫu đơn hoặc thống kê đơn hay bất cứ thứ gì vì vậy tôi không thực sự chắc chắn tại sao điều này đang xảy ra hoặc cách tránh nó. Tôi đã đăng các bit có liên quan của mã của tôi bên dưới để bạn xem xét.
Các kho cơ sở:
public class BaseRepository
{
/// <summary>
/// Initializes a repository and registers with a <see cref="IUnitOfWork"/>
/// </summary>
/// <param name="unitOfWork"></param>
public BaseRepository(IUnitOfWork unitOfWork)
{
if (unitOfWork == null) throw new ArgumentException("unitofWork");
UnitOfWork = unitOfWork;
}
/// <summary>
/// Returns a <see cref="DbSet"/> of entities.
/// </summary>
/// <typeparam name="TEntity">Entity type the dbset needs to return.</typeparam>
/// <returns></returns>
protected virtual DbSet<TEntity> GetDbSet<TEntity>() where TEntity : class
{
return Context.Set<TEntity>();
}
/// <summary>
/// Sets the state of an entity.
/// </summary>
/// <param name="entity">object to set state.</param>
/// <param name="entityState"><see cref="EntityState"/></param>
protected virtual void SetEntityState(object entity, EntityState entityState)
{
Context.Entry(entity).State = entityState;
}
/// <summary>
/// Unit of work controlling this repository.
/// </summary>
protected IUnitOfWork UnitOfWork { get; set; }
/// <summary>
///
/// </summary>
/// <param name="entity"></param>
protected virtual void Attach(object entity)
{
if (Context.Entry(entity).State == EntityState.Detached)
Context.Entry(entity).State = EntityState.Modified;
}
protected virtual void Detach(object entity)
{
Context.Entry(entity).State = EntityState.Detached;
}
/// <summary>
/// Provides access to the ef context we are working with
/// </summary>
internal StatementAutoEntities Context
{
get
{
return (StatementAutoEntities)UnitOfWork;
}
}
}
StatementAutoEntities là lớp EF autogenerated.
Việc thực hiện kho:
public class ProcessingQueueRepository : BaseRepository, IProcessingQueueRepository
{
/// <summary>
/// Creates a new repository and associated with a <see cref="IUnitOfWork"/>
/// </summary>
/// <param name="unitOfWork"></param>
public ProcessingQueueRepository(IUnitOfWork unitOfWork) : base(unitOfWork)
{
}
/// <summary>
/// Create a new <see cref="ProcessingQueue"/> entry in database
/// </summary>
/// <param name="Queue">
/// <see cref="ProcessingQueue"/>
/// </param>
public void Create(ProcessingQueue Queue)
{
GetDbSet<ProcessingQueue>().Add(Queue);
UnitOfWork.SaveChanges();
}
/// <summary>
/// Updates a <see cref="ProcessingQueue"/> entry in database
/// </summary>
/// <param name="queue">
/// <see cref="ProcessingQueue"/>
/// </param>
public void Update(ProcessingQueue queue)
{
//Attach(queue);
UnitOfWork.SaveChanges();
}
/// <summary>
/// Delete a <see cref="ProcessingQueue"/> entry in database
/// </summary>
/// <param name="Queue">
/// <see cref="ProcessingQueue"/>
/// </param>
public void Delete(ProcessingQueue Queue)
{
GetDbSet<ProcessingQueue>().Remove(Queue);
UnitOfWork.SaveChanges();
}
/// <summary>
/// Gets a <see cref="ProcessingQueue"/> by its unique Id
/// </summary>
/// <param name="id"></param>
/// <returns></returns>
public ProcessingQueue GetById(int id)
{
return (from e in Context.ProcessingQueue_SelectById(id) select e).FirstOrDefault();
}
/// <summary>
/// Gets a list of <see cref="ProcessingQueue"/> entries by status
/// </summary>
/// <param name="status"></param>
/// <returns></returns>
public IList<ProcessingQueue> GetByStatus(int status)
{
return (from e in Context.ProcessingQueue_SelectByStatus(status) select e).ToList();
}
/// <summary>
/// Gets a list of all <see cref="ProcessingQueue"/> entries
/// </summary>
/// <returns></returns>
public IList<ProcessingQueue> GetAll()
{
return (from e in Context.ProcessingQueue_Select() select e).ToList();
}
/// <summary>
/// Gets the next pending item id in the queue for a specific work
/// </summary>
/// <param name="serverId">Unique id of the server that will process the item in the queue</param>
/// <param name="workTypeId">type of <see cref="WorkType"/> we are looking for</param>
/// <param name="operationId">if defined only operations of the type indicated are considered.</param>
/// <returns>Next pending item in the queue for the work type or null if no pending work is found</returns>
public int GetNextPendingItemId(int serverId, int workTypeId, int? operationId)
{
var id = Context.ProcessingQueue_GetNextPending(serverId, workTypeId, operationId).SingleOrDefault();
return id.HasValue ? id.Value : -1;
}
/// <summary>
/// Returns a list of <see cref="ProcessingQueueStatus_dto"/>s objects with all
/// active entries in the queue
/// </summary>
/// <returns></returns>
public IList<ProcessingQueueStatus_dto> GetActiveStatusEntries()
{
return (from e in Context.ProcessingQueueStatus_Select() select e).ToList();
}
/// <summary>
/// Bumps an entry to the front of the queue
/// </summary>
/// <param name="processingQueueId"></param>
public void Bump(int processingQueueId)
{
Context.ProcessingQueue_Bump(processingQueueId);
}
}
Chúng tôi sử dụng Unity cho dependency injection, một số mã gọi ví dụ:
#region Members
private readonly IProcessingQueueRepository _queueRepository;
#endregion
#region Constructors
/// <summary>Initializes ProcessingQueue services with repositories</summary>
/// <param name="queueRepository"><see cref="IProcessingQueueRepository"/></param>
public ProcessingQueueService(IProcessingQueueRepository queueRepository)
{
Check.Require(queueRepository != null, "processingQueueRepository is required");
_queueRepository = queueRepository;
}
#endregion
Mã trong dịch vụ cửa sổ đá ra khỏi giờ là như sau:
_staWorkTypeConfigLock.EnterReadLock();
foreach (var timer in from operation in (from o in _staWorkTypeConfig.WorkOperations where o.UseQueueForExecution && o.AssignedProcessors > 0 select o)
let interval = operation.SpawnInternval < 30 ? 30 : operation.SpawnInternval
select new StaTimer
{
Interval = _runImmediate ? 5000 : interval*1000,
Operation = (ProcessingQueue.RequestedOperation) operation.OperationId
})
{
timer.Elapsed += ApxQueueProcessingOnElapsedInterval;
timer.Enabled = true;
Logger.DebugFormat("Queue processing for operations of type {0} will execute every {1} seconds", timer.Operation, timer.Interval/1000);
}
_staWorkTypeConfigLock.ExitReadLock();
StaTimer chỉ là một trình bao bọc thêm giờ hoạt động. ApxQueueProcessingOnElapsedInterval sau đó bascially chỉ gán công việc cho quá trình dựa trên hoạt động.
Tôi cũng sẽ thêm một chút mã ApxQueueProcessingOnElapsedInterval nơi chúng tôi đang tạo các tác vụ.
_staTasksLock.EnterWriteLock();
for (var x = 0; x < tasksNeeded; x++)
{
var t = new Task(obj => ProcessStaQueue((QueueProcessConfig) obj),
CreateQueueProcessConfig(true, operation), _cancellationToken);
_staTasks.Add(new Tuple<ProcessingQueue.RequestedOperation, DateTime, Task>(operation, DateTime.Now,t));
t.Start();
Thread.Sleep(300); //so there are less conflicts fighting for jobs in the queue table
}
_staTasksLock.ExitWriteLock();
IoC có chứa 'Vứt bỏ' các trường hợp ngữ cảnh không? –