2012-09-20 45 views
21

Vì vậy, tôi có quy trình dịch vụ cửa sổ thực hiện quy trình quy trình làm việc. Phần cuối sử dụng Repository và UnitofWork Pattern và Unity trên đầu trang Entity Framework với lớp thực thể được tạo ra từ edmx. Tôi sẽ không đi vào rất nhiều chi tiết như nó không cần thiết nhưng về cơ bản có 5 bước mà quy trình làm việc đi qua. Một quy trình cụ thể có thể ở bất kỳ giai đoạn nào vào bất kỳ thời điểm nào (theo thứ tự). Bước một chỉ tạo dữ liệu cho bước hai, xác nhận dữ liệu thông qua một quá trình chạy dài đến một máy chủ khác. Sau đó, bước có tạo ra một pdf với dữ liệu đó. Đối với mỗi giai đoạn chúng tôi sinh ra một bộ đếm thời gian, tuy nhiên nó được cấu hình để cho phép nhiều hơn một bộ đếm thời gian được sinh ra cho mỗi giai đoạn. Trong đó đặt vấn đề. Khi tôi thêm bộ xử lý vào một giai đoạn cụ thể, lỗi sau đây ngẫu nhiên:Khung thực thể đa luồng: Kết nối không được đóng. Trạng thái hiện tại của kết nối đang kết nối

Kết nối không được đóng. Trạng thái hiện tại của kết nối đang kết nối.

Đọc lên trên điều này có vẻ hiển nhiên là điều này đang xảy ra vì ngữ cảnh đang cố gắng truy cập cùng một thực thể từ hai luồng. Nhưng đây là nơi nó là loại ném cho tôi một vòng lặp. Tất cả thông tin tôi có thể tìm thấy trên trạng thái này là chúng ta nên sử dụng một ngữ cảnh cá thể cho mỗi luồng. Mà theo như tôi có thể nói tôi đang làm (xem mã dưới đây). Tôi không sử dụng mẫu đơn hoặc thống kê đơn hay bất cứ thứ gì vì vậy tôi không thực sự chắc chắn tại sao điều này đang xảy ra hoặc cách tránh nó. Tôi đã đăng các bit có liên quan của mã của tôi bên dưới để bạn xem xét.

Các kho cơ sở:

public class BaseRepository 
{ 
    /// <summary> 
    /// Initializes a repository and registers with a <see cref="IUnitOfWork"/> 
    /// </summary> 
    /// <param name="unitOfWork"></param> 
    public BaseRepository(IUnitOfWork unitOfWork) 
    { 
     if (unitOfWork == null) throw new ArgumentException("unitofWork"); 
     UnitOfWork = unitOfWork; 
    } 


    /// <summary> 
    /// Returns a <see cref="DbSet"/> of entities. 
    /// </summary> 
    /// <typeparam name="TEntity">Entity type the dbset needs to return.</typeparam> 
    /// <returns></returns> 
    protected virtual DbSet<TEntity> GetDbSet<TEntity>() where TEntity : class 
    { 

     return Context.Set<TEntity>(); 
    } 

    /// <summary> 
    /// Sets the state of an entity. 
    /// </summary> 
    /// <param name="entity">object to set state.</param> 
    /// <param name="entityState"><see cref="EntityState"/></param> 
    protected virtual void SetEntityState(object entity, EntityState entityState) 
    { 
     Context.Entry(entity).State = entityState; 
    } 

    /// <summary> 
    /// Unit of work controlling this repository.  
    /// </summary> 
    protected IUnitOfWork UnitOfWork { get; set; } 

    /// <summary> 
    /// 
    /// </summary> 
    /// <param name="entity"></param> 
    protected virtual void Attach(object entity) 
    { 
     if (Context.Entry(entity).State == EntityState.Detached) 
      Context.Entry(entity).State = EntityState.Modified; 
    } 

    protected virtual void Detach(object entity) 
    { 
     Context.Entry(entity).State = EntityState.Detached; 
    } 

    /// <summary> 
    /// Provides access to the ef context we are working with 
    /// </summary> 
    internal StatementAutoEntities Context 
    { 
     get 
     {     
      return (StatementAutoEntities)UnitOfWork; 
     } 
    } 
} 

StatementAutoEntities là lớp EF autogenerated.

Việc thực hiện kho:

public class ProcessingQueueRepository : BaseRepository, IProcessingQueueRepository 
{ 

    /// <summary> 
    /// Creates a new repository and associated with a <see cref="IUnitOfWork"/> 
    /// </summary> 
    /// <param name="unitOfWork"></param> 
    public ProcessingQueueRepository(IUnitOfWork unitOfWork) : base(unitOfWork) 
    { 
    } 

    /// <summary> 
    /// Create a new <see cref="ProcessingQueue"/> entry in database 
    /// </summary> 
    /// <param name="Queue"> 
    ///  <see cref="ProcessingQueue"/> 
    /// </param> 
    public void Create(ProcessingQueue Queue) 
    { 
     GetDbSet<ProcessingQueue>().Add(Queue); 
     UnitOfWork.SaveChanges(); 
    } 

    /// <summary> 
    /// Updates a <see cref="ProcessingQueue"/> entry in database 
    /// </summary> 
    /// <param name="queue"> 
    ///  <see cref="ProcessingQueue"/> 
    /// </param> 
    public void Update(ProcessingQueue queue) 
    { 
     //Attach(queue); 
     UnitOfWork.SaveChanges(); 
    } 

    /// <summary> 
    /// Delete a <see cref="ProcessingQueue"/> entry in database 
    /// </summary> 
    /// <param name="Queue"> 
    ///  <see cref="ProcessingQueue"/> 
    /// </param> 
    public void Delete(ProcessingQueue Queue) 
    { 
     GetDbSet<ProcessingQueue>().Remove(Queue); 
     UnitOfWork.SaveChanges(); 
    } 

    /// <summary> 
    /// Gets a <see cref="ProcessingQueue"/> by its unique Id 
    /// </summary> 
    /// <param name="id"></param> 
    /// <returns></returns> 
    public ProcessingQueue GetById(int id) 
    { 
     return (from e in Context.ProcessingQueue_SelectById(id) select e).FirstOrDefault(); 
    } 

    /// <summary> 
    /// Gets a list of <see cref="ProcessingQueue"/> entries by status 
    /// </summary> 
    /// <param name="status"></param> 
    /// <returns></returns> 
    public IList<ProcessingQueue> GetByStatus(int status) 
    { 
     return (from e in Context.ProcessingQueue_SelectByStatus(status) select e).ToList(); 
    } 

    /// <summary> 
    /// Gets a list of all <see cref="ProcessingQueue"/> entries 
    /// </summary> 
    /// <returns></returns> 
    public IList<ProcessingQueue> GetAll() 
    { 
     return (from e in Context.ProcessingQueue_Select() select e).ToList(); 
    } 

    /// <summary> 
    /// Gets the next pending item id in the queue for a specific work   
    /// </summary> 
    /// <param name="serverId">Unique id of the server that will process the item in the queue</param> 
    /// <param name="workTypeId">type of <see cref="WorkType"/> we are looking for</param> 
    /// <param name="operationId">if defined only operations of the type indicated are considered.</param> 
    /// <returns>Next pending item in the queue for the work type or null if no pending work is found</returns> 
    public int GetNextPendingItemId(int serverId, int workTypeId, int? operationId) 
    { 
     var id = Context.ProcessingQueue_GetNextPending(serverId, workTypeId, operationId).SingleOrDefault(); 
     return id.HasValue ? id.Value : -1; 
    } 

    /// <summary> 
    /// Returns a list of <see cref="ProcessingQueueStatus_dto"/>s objects with all 
    /// active entries in the queue 
    /// </summary> 
    /// <returns></returns> 
    public IList<ProcessingQueueStatus_dto> GetActiveStatusEntries() 
    { 
     return (from e in Context.ProcessingQueueStatus_Select() select e).ToList(); 
    } 
    /// <summary> 
    /// Bumps an entry to the front of the queue 
    /// </summary> 
    /// <param name="processingQueueId"></param> 
    public void Bump(int processingQueueId) 
    { 
     Context.ProcessingQueue_Bump(processingQueueId); 
    } 
} 

Chúng tôi sử dụng Unity cho dependency injection, một số mã gọi ví dụ:

#region Members 
    private readonly IProcessingQueueRepository _queueRepository;  
    #endregion 

    #region Constructors 
    /// <summary>Initializes ProcessingQueue services with repositories</summary> 
    /// <param name="queueRepository"><see cref="IProcessingQueueRepository"/></param>   
    public ProcessingQueueService(IProcessingQueueRepository queueRepository) 
    { 
     Check.Require(queueRepository != null, "processingQueueRepository is required"); 
     _queueRepository = queueRepository; 

    } 
    #endregion 

Mã trong dịch vụ cửa sổ đá ra khỏi giờ là như sau:

  _staWorkTypeConfigLock.EnterReadLock(); 
     foreach (var timer in from operation in (from o in _staWorkTypeConfig.WorkOperations where o.UseQueueForExecution && o.AssignedProcessors > 0 select o) 
           let interval = operation.SpawnInternval < 30 ? 30 : operation.SpawnInternval 
           select new StaTimer 
          { 
           Interval = _runImmediate ? 5000 : interval*1000, 
           Operation = (ProcessingQueue.RequestedOperation) operation.OperationId 
          }) 
     { 
      timer.Elapsed += ApxQueueProcessingOnElapsedInterval; 
      timer.Enabled = true; 
      Logger.DebugFormat("Queue processing for operations of type {0} will execute every {1} seconds", timer.Operation, timer.Interval/1000);     
     } 
     _staWorkTypeConfigLock.ExitReadLock(); 

StaTimer chỉ là một trình bao bọc thêm giờ hoạt động. ApxQueueProcessingOnElapsedInterval sau đó bascially chỉ gán công việc cho quá trình dựa trên hoạt động.

Tôi cũng sẽ thêm một chút mã ApxQueueProcessingOnElapsedInterval nơi chúng tôi đang tạo các tác vụ.

  _staTasksLock.EnterWriteLock(); 
     for (var x = 0; x < tasksNeeded; x++) 
     { 
      var t = new Task(obj => ProcessStaQueue((QueueProcessConfig) obj), 
          CreateQueueProcessConfig(true, operation), _cancellationToken); 


      _staTasks.Add(new Tuple<ProcessingQueue.RequestedOperation, DateTime, Task>(operation, DateTime.Now,t)); 

      t.Start(); 
      Thread.Sleep(300); //so there are less conflicts fighting for jobs in the queue table 
     } 
     _staTasksLock.ExitWriteLock(); 
+0

IoC có chứa 'Vứt bỏ' các trường hợp ngữ cảnh không? –

Trả lời

26

Dường như dịch vụ, kho lưu trữ và ngữ cảnh của bạn được cho là sẽ hoạt động trong suốt thời gian hoạt động của ứng dụng nhưng điều đó không chính xác. Bạn có thể kích hoạt nhiều bộ hẹn giờ cùng một lúc. Điều đó có nghĩa là nhiều luồng sẽ sử dụng dịch vụ của bạn song song và chúng sẽ thực thi mã dịch vụ của bạn trong bối cảnh thread = của chúng được chia sẻ giữa nhiều luồng => exception vì ngữ cảnh không phải là luồng an toàn.

Tùy chọn duy nhất là sử dụng một cá thể ngữ cảnh mới cho mỗi thao tác bạn muốn thực hiện. Ví dụ, bạn có thể thay đổi các lớp của bạn để chấp nhận nhà máy ngữ cảnh thay vì ngữ cảnh và nhận được một ngữ cảnh mới cho từng hoạt động.

+0

Tôi đã thêm một chút mã để hiển thị cách các dịch vụ được sinh ra. Tôi đang sử dụng một nhiệm vụ mà sinh ra ProcessStaQueue mà sau đó hoạt động ra những gì hoạt động nó nên chạy. Tôi biết Bối cảnh không phải là chủ đề an toàn, nhưng với điều kiện tôi không sử dụng các mẫu thống kê hay mẫu đơn hay bất cứ thứ gì, không nên bối cảnh mới được tạo ra một cách độc lập cho từng tác vụ được thực hiện? – Brandon

+0

Dịch vụ cuối cùng được khởi tạo như sau: ServiceLocator.Current.GetInstance (); Điều này có thể có liên quan gì đến nó? Đoán của tôi là có lẽ là có lẽ ServiceLocator có một cái gì đó để làm với điều này. – Brandon

+0

Nhưng tùy thuộc vào việc thực hiện định vị dịch vụ nếu nó tạo ra một cá thể dịch vụ mới mỗi khi bạn yêu cầu nó hoặc nếu nó tạo ra cá thể chỉ cho yêu cầu đầu tiên và hơn là tái sử dụng cá thể. –

3

Trong trường hợp này giúp bất cứ ai:

Trong trường hợp của tôi, tôi đã đảm bảo rằng các phi thread-safe DbContext đã có một TransientLifetime (sử dụng Ninject), nhưng nó vẫn gây ra vấn đề đồng thời! Hóa ra rằng trong một số tùy chỉnh của tôi ActionFilters Tôi đã sử dụng Dependency Injection để truy cập vào DbContext trong hàm tạo, nhưng ActionFilters có tuổi thọ giúp chúng được khởi tạo qua nhiều yêu cầu, do đó ngữ cảnh không được tạo lại.

Tôi đã sửa lỗi bằng cách giải quyết sự phụ thuộc theo cách thủ công theo phương pháp OnActionExecuting thay vì trong hàm tạo để nó là một cá thể mới mỗi lần.

-1

Trong trường hợp của tôi, tôi đã gặp phải sự cố này vì tôi đã quên từ khóa await trước một trong các hàm gọi hàm DAL của tôi. Đặt await ở đó đã giải quyết.

+0

Awaik làm cho một luồng không đa luồng – Merta

Các vấn đề liên quan