unit ComputeCore; interface uses System.SysUtils, System.Classes, System.SyncObjs, Spring.Collections; const INFINITE = cardinal($FFFFFFFFF); type ITask = interface ['{CA107748-CA21-4570-9615-7CFBB837FCDA}'] function GetExceptObj: TObject; function GetWorkDone: TEvent; // // Executes the task. Internal use only. procedure Execute; // Stores exception raised in the task (if any). property ExceptObj: TObject read GetExceptObj; // Signalled when the work is done. property WorkDone: TEvent read GetWorkDone; end; IComputeCore = interface ['{FF061A68-2B81-40EB-A374-5C94CF13B610}'] // Schedules a procedure to be executed in a task as soon as possible. function Run(const taskProc: TProc): ITask; overload; // Waits for a task to complete execution procedure WaitFor(const task: ITask); end; TTask = class // Schedules a task in the global compute core class function Run(const taskProc: TProc): ITask; static; // Waits for a task running in the global compute core class procedure WaitFor(const task: ITask); static; end; /// Implementation /// TCCTask = class(TInterfacedObject, ITask) strict private FExceptObj: TObject; FTaskProc: TProc; FWorkDone: TEvent; strict protected function GetExceptObj: TObject; function GetWorkDone: TEvent; public constructor Create(const taskProc: TProc); destructor Destroy; override; procedure Execute; property ExceptObj: TObject read GetExceptObj; property WorkDone: TEvent read GetWorkDone; end; TCCThread = class; IComputeCoreEx = interface ['{2F143547-04AE-4994-B6D6-E691DF5EA384}'] function AllocateTask(thread: TCCThread; out task: ITask): boolean; end; TComputeCore = class; TCCThread = class(TThread) strict private FIsActive: boolean; FOwner_ref: TComputeCore; FSignal: TEvent; strict protected procedure SetSignal(const value: TEvent); protected procedure TerminatedSet; {$IF CompilerVersion >= 23}override;{$ENDIF} public constructor Create(const owner_ref: TComputeCore); overload; destructor Destroy; override; {$IF CompilerVersion < 23} procedure Terminate; {$IFEND} procedure Execute; override; property IsActive: boolean read FIsActive write FIsActive; property Signal: TEvent read FSignal write SetSignal; end; TComputeCore = class(TInterfacedObject, IComputeCore) strict private FInactiveThreads: TArray; FInactiveTop: integer; FTasks: IQueue; FThreads: TArray; strict protected //internal inline functions procedure __Acquire; inline; procedure __Release; inline; strict protected //thread-unsafe functions, must be called only when internal data is acquired // Returns next task to be executed or 'nil' if there is no such task. // Returns True when task is assigned. function GetTask_U(var task: ITask): boolean; // Marks a thread active/inactive procedure MarkThreadActive_U(thread: TCCThread; isActive: boolean); protected //thread-safe functions // Atomically gets next task and sets the thread 'active' state // (to True if the task was available, False if not). // Returns True if the task was available. // If the function returns False, 'task' is guaranteed to be 'nil'. function AllocateTask(thread: TCCThread; out task: ITask): boolean; // Atomically retrieves one inactive thread and marks it active. // Returns False and 'nil' in the 'thread' parameter if there are no // inactive threads. function ActivateInactiveThread(var thread: TCCThread): boolean; // Puts a task into the execution queue. procedure QueueTask(const task: ITask); public constructor Create(numThreads: integer); destructor Destroy; override; function Run(const taskProc: TProc): ITask; overload; procedure WaitFor(const task: ITask); end; var GlobalComputeCore: IComputeCore; implementation { TCCTask } constructor TCCTask.Create(const taskProc: TProc); begin inherited Create; FTaskProc := taskProc; FWorkDone := TEvent.Create; end; destructor TCCTask.Destroy; begin FWorkDone.WaitFor(INFINITE); FreeAndNil(FWorkDone); inherited; end; procedure TCCTask.Execute; begin try try FTaskProc(); except on E: Exception do FExceptObj := AcquireExceptionObject; end; finally FWorkDone.SetEvent; end; end; function TCCTask.GetExceptObj: TObject; begin Result := FExceptObj; end; function TCCTask.GetWorkDone: TEvent; begin Result := FWorkDone; end; { TCCThread } constructor TCCThread.Create(const owner_ref: TComputeCore); begin inherited Create(false); FOwner_ref := owner_ref; FSignal := TEvent.Create(nil, false, false, ''); end; destructor TCCThread.Destroy; begin FreeAndNil(FSignal); inherited; end; procedure TCCThread.Execute; var task: ITask; begin NameThreadForDebugging('ComputeCore worker'); while (not Terminated) and (FSignal.WaitFor <> wrTimeout) do begin while (not Terminated) and FOwner_ref.AllocateTask(Self, task) do task.Execute; end; end; procedure TCCThread.SetSignal(const value: TEvent); begin FSignal := Value; end; {$IF CompilerVersion < 23} procedure TCCThread.Terminate; begin inherited Terminate; TerminatedSet; end; {$IFEND} procedure TCCThread.TerminatedSet; begin FSignal.SetEvent; end; { TComputeCore } procedure TComputeCore.__Acquire; //inline begin MonitorEnter(Self); end; procedure TComputeCore.__Release; //inline begin MonitorExit(Self); end; function TComputeCore.ActivateInactiveThread(var thread: TCCThread): boolean; begin __Acquire; Result := FInactiveTop >= 0; if Result then begin thread := FInactiveThreads[FInactiveTop]; Dec(FInactiveTop); MarkThreadActive_U(thread, true); end else thread := nil; __Release; end; function TComputeCore.AllocateTask(thread: TCCThread; out task: ITask): boolean; begin __Acquire; MarkThreadActive_U(thread, GetTask_U(task)); __Release; Result := assigned(task); end; constructor TComputeCore.Create(numThreads: integer); var iThread: integer; thread: TCCThread; begin inherited Create; Assert(numThreads > 0); FTasks := TCollections.CreateQueue; SetLength(FInactiveThreads, numThreads); SetLength(FThreads, numThreads); for iThread := 1 to numThreads do begin thread := TCCThread.Create(Self); thread.IsActive := false; FThreads[iThread-1] := thread; FInactiveThreads[iThread-1] := thread; end; FInactiveTop := High(FInactiveThreads); end; destructor TComputeCore.Destroy; var thread: TCCThread; begin for thread in FThreads do thread.Terminate; for thread in FThreads do begin thread.WaitFor; FreeAndNil(thread); end; inherited; end; function TComputeCore.GetTask_U(var task: ITask): boolean; begin Result := FTasks.TryDequeue(task); if not Result then task := nil; end; procedure TComputeCore.MarkThreadActive_U(thread: TCCTHread; isActive: boolean); var iThread: integer; begin if isActive <> thread.IsActive then begin thread.IsActive := isActive; if isActive then begin for iThread := 0 to FInactiveTop do if FInactiveThreads[iThread] = thread then begin if iThread <> FInactiveTop then FInactiveThreads[iThread] := FInactiveThreads[FInactiveTop]; Dec(FInactiveTop); break; end; end else begin Inc(FInactiveTop); FInactiveThreads[FInactiveTop] := thread; end; end; end; procedure TComputeCore.QueueTask(const task: ITask); begin __Acquire; FTasks.Enqueue(task); __Release; end; function TComputeCore.Run(const taskProc: TProc): ITask; var thread: TCCThread; begin Result := TCCTask.Create(taskProc); QueueTask(Result); if ActivateInactiveThread(thread) then thread.Signal.SetEvent; end; procedure TComputeCore.WaitFor(const task: ITask); var newTask: ITask; begin while task.WorkDone.WaitFor(0) = wrTimeout do begin __Acquire; GetTask_U(newTask); __Release; if not assigned(newTask) then TThread.Yield else begin newTask.Execute; if assigned(newTask.ExceptObj) then raise Exception(newTask.ExceptObj); end; end; if assigned(task.ExceptObj) then raise Exception(task.ExceptObj); end; { TTask } class function TTask.Run(const taskProc: TProc): ITask; var interlockRes: pointer; tmpCC : IComputeCore; begin if not assigned(GlobalComputeCore) then begin Assert(NativeUInt(@GlobalComputeCore) mod SizeOf(pointer) = 0, 'TTask.Run: storage is not properly aligned!'); Assert(NativeUInt(@tmpCC) mod SizeOf(pointer) = 0, 'TTask.Run: tmpCC is not properly aligned!'); tmpCC := TComputeCore.Create(CPUCount - 1); interlockRes := TInterlocked.CompareExchange(PPointer(@GlobalComputeCore)^, PPointer(@tmpCC)^, nil); if interlockRes = nil then PPointer(@tmpCC)^ := nil; end; Result := GlobalComputeCore.Run(taskProc); end; class procedure TTask.WaitFor(const task: ITask); begin GlobalComputeCore.WaitFor(task); end; end.