Main Page | Class Hierarchy | Class List | File List | Class Members | File Members

worker.c File Reference

#include "exp.h"

Go to the source code of this file.

Defines

#define DELAYED_WORK_QUEUE_PRIORITY   (12 - NORMAL_BASE_PRIORITY)
#define CRITICAL_WORK_QUEUE_PRIORITY   (13 - NORMAL_BASE_PRIORITY)
#define HYPER_CRITICAL_WORK_QUEUE_PRIORITY   (15 - NORMAL_BASE_PRIORITY)
#define MAX_ADDITIONAL_THREADS   16
#define MAX_ADDITIONAL_DYNAMIC_THREADS   16
#define SMALL_NUMBER_OF_THREADS   2
#define MEDIUM_NUMBER_OF_THREADS   3
#define LARGE_NUMBER_OF_THREADS   5
#define DYNAMIC_THREAD_TIMEOUT   ((LONGLONG)10 * 60 * 1000 * 1000 * 10)
#define THREAD_SET_INTERVAL   (1 * 1000 * 1000 * 10)
#define DYNAMIC_WORKER_THREAD   0x80000000

Typedefs

typedef enum _BALANCE_OBJECT BALANCE_OBJECT

Enumerations

enum  _BALANCE_OBJECT { TimerExpiration, ThreadSetManagerEvent, MaximumBalanceObject }

Functions

 C_ASSERT (MaximumBalanceObject< THREAD_WAIT_OBJECTS)
VOID ExpCheckDynamicThreadCount (VOID)
NTSTATUS ExpCreateWorkerThread (WORK_QUEUE_TYPE QueueType, BOOLEAN Dynamic)
VOID ExpDetectWorkerThreadDeadlock (VOID)
VOID ExpWorkerThreadBalanceManager (IN PVOID StartContext)
VOID ExpWorkerThread (IN PVOID StartContext)
PVOID ExpCheckForWorker (IN PVOID p, IN ULONG Size)
BOOLEAN __inline ExpNewThreadNecessary (IN WORK_QUEUE_TYPE QueueType)
BOOLEAN ExpWorkerInitialization (VOID)
VOID ExQueueWorkItem (IN PWORK_QUEUE_ITEM WorkItem, IN WORK_QUEUE_TYPE QueueType)

Variables

EX_WORK_QUEUE ExWorkerQueue [MaximumWorkQueue]
ULONG ExpAdditionalCriticalWorkerThreads
ULONG ExpAdditionalDelayedWorkerThreads
ULONG ExCriticalWorkerThreads
ULONG ExDelayedWorkerThreads
KEVENT ExThreadSetManagerEvent


Define Documentation

#define CRITICAL_WORK_QUEUE_PRIORITY   (13 - NORMAL_BASE_PRIORITY)
 

Definition at line 49 of file worker.c.

Referenced by ExpCreateWorkerThread(), and ExpWorkerThreadBalanceManager().

#define DELAYED_WORK_QUEUE_PRIORITY   (12 - NORMAL_BASE_PRIORITY)
 

Definition at line 48 of file worker.c.

Referenced by ExpCreateWorkerThread().

#define DYNAMIC_THREAD_TIMEOUT   ((LONGLONG)10 * 60 * 1000 * 1000 * 10)
 

Definition at line 67 of file worker.c.

Referenced by ExpWorkerThread().

#define DYNAMIC_WORKER_THREAD   0x80000000
 

Definition at line 80 of file worker.c.

Referenced by ExpCreateWorkerThread(), and ExpWorkerThread().

#define HYPER_CRITICAL_WORK_QUEUE_PRIORITY   (15 - NORMAL_BASE_PRIORITY)
 

Definition at line 50 of file worker.c.

Referenced by ExpCreateWorkerThread().

#define LARGE_NUMBER_OF_THREADS   5
 

Definition at line 61 of file worker.c.

Referenced by ExpWorkerInitialization().

#define MAX_ADDITIONAL_DYNAMIC_THREADS   16
 

Definition at line 57 of file worker.c.

Referenced by ExpDetectWorkerThreadDeadlock(), and ExpNewThreadNecessary().

#define MAX_ADDITIONAL_THREADS   16
 

Definition at line 56 of file worker.c.

Referenced by ExpWorkerInitialization().

#define MEDIUM_NUMBER_OF_THREADS   3
 

Definition at line 60 of file worker.c.

Referenced by ExpWorkerInitialization().

#define SMALL_NUMBER_OF_THREADS   2
 

Definition at line 59 of file worker.c.

Referenced by ExpWorkerInitialization().

#define THREAD_SET_INTERVAL   (1 * 1000 * 1000 * 10)
 

Definition at line 73 of file worker.c.

Referenced by ExpWorkerThreadBalanceManager().


Typedef Documentation

typedef enum _BALANCE_OBJECT BALANCE_OBJECT
 


Enumeration Type Documentation

enum _BALANCE_OBJECT
 

Enumeration values:
TimerExpiration 
ThreadSetManagerEvent 
MaximumBalanceObject 

Definition at line 30 of file worker.c.


Function Documentation

C_ASSERT  ) 
 

Referenced by NtSetInformationProcess(), and RtlSelfRelativeToAbsoluteSD2().

VOID ExpCheckDynamicThreadCount VOID   ) 
 

Definition at line 806 of file worker.c.

References ExpCreateWorkerThread(), ExpNewThreadNecessary(), MaximumWorkQueue, PAGED_CODE, TRUE, and WORK_QUEUE_TYPE.

Referenced by ExpWorkerThreadBalanceManager().

00810 : 00811 00812 This routine is called when there is reason to believe that a work queue 00813 might benefit from the creation of an additional worker thread. 00814 00815 This routine checks each queue to determine whether it would benefit from 00816 an additional worker thread (see ExpNewThreadNecessary()), and creates 00817 one if so. 00818 00819 Arguments: 00820 00821 None. 00822 00823 Return Value: 00824 00825 None. 00826 00827 --*/ 00828 { 00829 WORK_QUEUE_TYPE QueueType; 00830 00831 PAGED_CODE(); 00832 00833 // 00834 // Check each worker queue. 00835 // 00836 00837 for (QueueType = 0; QueueType < MaximumWorkQueue; QueueType++) { 00838 00839 if (ExpNewThreadNecessary(QueueType)) { 00840 00841 // 00842 // Create a new thread for this queue. We explicitly ignore 00843 // an error from ExpCreateDynamicThread(): there's nothing 00844 // we can or should do in the event of a failure. 00845 // 00846 00847 ExpCreateWorkerThread(QueueType, TRUE); 00848 } 00849 } 00850 }

PVOID ExpCheckForWorker IN PVOID  p,
IN ULONG  Size
 

Referenced by ExFreePoolSanityChecks().

NTSTATUS ExpCreateWorkerThread WORK_QUEUE_TYPE  QueueType,
BOOLEAN  Dynamic
 

Definition at line 971 of file worker.c.

References CRITICAL_WORK_QUEUE_PRIORITY, CriticalWorkQueue, DbgPrint, DELAYED_WORK_QUEUE_PRIORITY, DelayedWorkQueue, DYNAMIC_WORKER_THREAD, ExpWorkerThread(), ExWorkerQueue, FALSE, HYPER_CRITICAL_WORK_QUEUE_PRIORITY, HyperCriticalWorkQueue, KernelMode, KeSetBasePriorityThread(), L, NT_SUCCESS, NTSTATUS(), NULL, ObDereferenceObject, ObjectAttributes, ObReferenceObjectByHandle(), PsCreateSystemThread(), PsThreadType, Status, and ThreadHandle.

Referenced by ExpCheckDynamicThreadCount(), ExpDetectWorkerThreadDeadlock(), and ExpWorkerInitialization().

00978 : 00979 00980 This function creates a single new static or dynamic worker thread for 00981 the given queue type. 00982 00983 Arguments: 00984 00985 QueueType - Supplies the type of the queue for which the worker thread 00986 should be created. 00987 00988 Dynamic - If TRUE, the worker thread is created as a dynamic thread that 00989 will terminate after a sufficient period of inactivity. If FALSE, 00990 the worker thread will never terminate. 00991 00992 00993 Return Value: 00994 00995 The final status of the operation. 00996 00997 Notes: 00998 00999 This routine is only called from the worker thread set balance thread, 01000 therefore it will not be reentered. 01001 01002 --*/ 01003 01004 { 01005 OBJECT_ATTRIBUTES ObjectAttributes; 01006 NTSTATUS Status; 01007 HANDLE ThreadHandle; 01008 ULONG Context; 01009 ULONG BasePriority; 01010 PETHREAD Thread; 01011 01012 InitializeObjectAttributes(&ObjectAttributes, NULL, 0, NULL, NULL); 01013 01014 Context = QueueType; 01015 if (Dynamic != FALSE) { 01016 Context |= DYNAMIC_WORKER_THREAD; 01017 } 01018 01019 Status = PsCreateSystemThread(&ThreadHandle, 01020 THREAD_ALL_ACCESS, 01021 &ObjectAttributes, 01022 0L, 01023 NULL, 01024 ExpWorkerThread, 01025 (PVOID)Context); 01026 if (!NT_SUCCESS(Status)) { 01027 #if DBG 01028 DbgPrint("EXWORKER: Worker thread creation failed, status %08x\n", 01029 Status); 01030 #endif 01031 return Status; 01032 } 01033 01034 if (Dynamic != FALSE) { 01035 01036 #if DBG 01037 DbgPrint("EXWORKER: Created dynamic thread type %d, %d total\n", 01038 QueueType, 01039 ExWorkerQueue[QueueType].DynamicThreadCount); 01040 #endif 01041 01042 InterlockedIncrement( &ExWorkerQueue[QueueType].DynamicThreadCount ); 01043 } 01044 01045 // 01046 // Set the priority according to the type of worker thread. 01047 // 01048 01049 switch (QueueType) { 01050 01051 case HyperCriticalWorkQueue: 01052 01053 BasePriority = HYPER_CRITICAL_WORK_QUEUE_PRIORITY; 01054 break; 01055 01056 case CriticalWorkQueue: 01057 01058 BasePriority = CRITICAL_WORK_QUEUE_PRIORITY; 01059 break; 01060 01061 case DelayedWorkQueue: 01062 01063 BasePriority = DELAYED_WORK_QUEUE_PRIORITY; 01064 break; 01065 } 01066 01067 // 01068 // Set the base priority of the just-created thread. 01069 // 01070 01071 Status = ObReferenceObjectByHandle( ThreadHandle, 01072 THREAD_SET_INFORMATION, 01073 PsThreadType, 01074 KernelMode, 01075 (PVOID *)&Thread, 01076 NULL ); 01077 if (NT_SUCCESS(Status)) { 01078 01079 KeSetBasePriorityThread( &Thread->Tcb, BasePriority ); 01080 ObDereferenceObject( Thread ); 01081 01082 } else { 01083 01084 // 01085 // The thread was created but we were unable to reference it. This is 01086 // very odd... just leave it at the default priority, better than no 01087 // thread at all. 01088 // 01089 01090 } 01091 01092 ZwClose( ThreadHandle ); 01093 return Status; 01094 }

VOID ExpDetectWorkerThreadDeadlock VOID   ) 
 

Definition at line 853 of file worker.c.

References ASSERT, DbgPrint, _EX_WORK_QUEUE::DynamicThreadCount, ExpCreateWorkerThread(), ExWorkerQueue, Index, KeReadStateQueue(), MAX_ADDITIONAL_DYNAMIC_THREADS, MaximumWorkQueue, NTSTATUS(), ObjectAttributes, PAGED_CODE, _EX_WORK_QUEUE::QueueDepthLastPass, Status, TRUE, _EX_WORK_QUEUE::WorkerQueue, _EX_WORK_QUEUE::WorkItemsProcessed, and _EX_WORK_QUEUE::WorkItemsProcessedLastPass.

Referenced by ExpWorkerThreadBalanceManager().

00857 : 00858 00859 This function creates new work item threads if a possible deadlock is 00860 detected. 00861 00862 Arguments: 00863 00864 None. 00865 00866 Return Value: 00867 00868 None 00869 00870 --*/ 00871 00872 { 00873 LONG QueueDepth; 00874 ULONG Index; 00875 LONG ThreadCount; 00876 LONG NewThreadCount; 00877 NTSTATUS Status; 00878 HANDLE Thread; 00879 OBJECT_ATTRIBUTES ObjectAttributes; 00880 PEX_WORK_QUEUE Queue; 00881 00882 PAGED_CODE(); 00883 00884 // 00885 // Process each queue type. 00886 // 00887 00888 for (Index = 0; Index < MaximumWorkQueue; Index += 1) { 00889 00890 Queue = &ExWorkerQueue[Index]; 00891 00892 ASSERT( Queue->DynamicThreadCount <= 00893 MAX_ADDITIONAL_DYNAMIC_THREADS ); 00894 00895 if (Queue->QueueDepthLastPass > 0 && 00896 00897 Queue->WorkItemsProcessed == 00898 Queue->WorkItemsProcessedLastPass && 00899 00900 Queue->DynamicThreadCount < 00901 MAX_ADDITIONAL_DYNAMIC_THREADS) { 00902 00903 // 00904 // These things are known: 00905 // 00906 // - There were work items waiting in the queue at the last pass. 00907 // - No work items have been processed since the last pass. 00908 // - We haven't yet created the maximum number of dynamic threads. 00909 // 00910 // Things look like they're stuck, create a new thread. 00911 // 00912 00913 #if DBG 00914 DbgPrint("EXWORKER: Work item deadlock detected, creating " 00915 "type %d worker thread\n", 00916 Index ); 00917 #endif 00918 00919 // 00920 // Create a new thread for this queue. We explicitly ignore 00921 // an error from ExpCreateDynamicThread(): we'll try again in 00922 // another detection period if the queue looks like it's still 00923 // stuck. 00924 // 00925 00926 ExpCreateWorkerThread(Index, TRUE); 00927 } 00928 00929 // 00930 // Update some bookkeeping. 00931 // 00932 // Note that WorkItemsProcessed and the queue depth must be recorded 00933 // in that order to avoid getting a false deadlock indication. 00934 // 00935 00936 Queue->WorkItemsProcessedLastPass = Queue->WorkItemsProcessed; 00937 Queue->QueueDepthLastPass = KeReadStateQueue( &Queue->WorkerQueue ); 00938 } 00939 }

BOOLEAN __inline ExpNewThreadNecessary IN WORK_QUEUE_TYPE  QueueType  ) 
 

Definition at line 159 of file worker.c.

References _KQUEUE::CurrentCount, _EX_WORK_QUEUE::DynamicThreadCount, _KQUEUE::EntryListHead, ExWorkerQueue, FALSE, _EX_WORK_QUEUE::MakeThreadsAsNecessary, MAX_ADDITIONAL_DYNAMIC_THREADS, _KQUEUE::MaximumCount, PEX_WORK_QUEUE, TRUE, and _EX_WORK_QUEUE::WorkerQueue.

Referenced by ExpCheckDynamicThreadCount(), and ExQueueWorkItem().

00165 : 00166 00167 This function checks the supplied worker queue and determines whether 00168 it is appropriate to spin up a dynamic worker thread for that queue. 00169 00170 Arguments: 00171 00172 QueueType - Supplies the type of the queue that should be examined. 00173 00174 Return Value: 00175 00176 TRUE if the given work queue would benefit from the creation of an 00177 additional thread, FALSE if not. 00178 00179 --*/ 00180 { 00181 PEX_WORK_QUEUE Queue; 00182 00183 Queue = &ExWorkerQueue[QueueType]; 00184 00185 if (Queue->MakeThreadsAsNecessary != FALSE && 00186 IsListEmpty( &Queue->WorkerQueue.EntryListHead ) == FALSE && 00187 Queue->WorkerQueue.CurrentCount < Queue->WorkerQueue.MaximumCount && 00188 Queue->DynamicThreadCount < MAX_ADDITIONAL_DYNAMIC_THREADS) { 00189 00190 // 00191 // We know these things: 00192 // 00193 // - This queue is eligible for dynamic creation of threads to try 00194 // to keep the CPUs busy, 00195 // 00196 // - There are work items waiting in the queue, 00197 // 00198 // - The number of runable worker threads for this queue is less than 00199 // the number of processors on this system, and 00200 // 00201 // - We haven't reached the maximum dynamic thread count. 00202 // 00203 // An additional worker thread at this point will help clear the 00204 // backlog. 00205 // 00206 00207 return TRUE; 00208 00209 } else { 00210 00211 // 00212 // One of the above conditions is false. 00213 // 00214 00215 return FALSE; 00216 } 00217 }

BOOLEAN ExpWorkerInitialization VOID   ) 
 

Definition at line 220 of file worker.c.

References CriticalWorkQueue, DelayedWorkQueue, ExCriticalWorkerThreads, ExDelayedWorkerThreads, ExpAdditionalCriticalWorkerThreads, ExpAdditionalDelayedWorkerThreads, ExpCreateWorkerThread(), ExpWorkerThreadBalanceManager(), ExThreadSetManagerEvent, ExWorkerQueue, FALSE, HyperCriticalWorkQueue, Index, KeInitializeEvent, KeInitializeQueue(), L, LARGE_NUMBER_OF_THREADS, _EX_WORK_QUEUE::MakeThreadsAsNecessary, MAX_ADDITIONAL_THREADS, MaximumWorkQueue, MEDIUM_NUMBER_OF_THREADS, MmIsThisAnNtAsSystem(), MmLargeSystem, MmMediumSystem, MmNumberOfPhysicalPages, MmQuerySystemSize(), MmSmallSystem, NT_SUCCESS, NTSTATUS(), NULL, ObjectAttributes, PAGE_SIZE, PsCreateSystemThread(), SMALL_NUMBER_OF_THREADS, Status, TRUE, and WORK_QUEUE_TYPE.

00224 { 00225 00226 ULONG Index; 00227 OBJECT_ATTRIBUTES ObjectAttributes; 00228 ULONG NumberOfDelayedThreads; 00229 ULONG NumberOfCriticalThreads; 00230 NTSTATUS Status; 00231 HANDLE Thread; 00232 BOOLEAN NtAs; 00233 WORK_QUEUE_TYPE WorkQueueType; 00234 00235 // 00236 // Set the number of worker threads based on the system size. 00237 // 00238 00239 NtAs = MmIsThisAnNtAsSystem(); 00240 switch (MmQuerySystemSize()) { 00241 case MmSmallSystem: 00242 NumberOfDelayedThreads = MEDIUM_NUMBER_OF_THREADS; 00243 if (MmNumberOfPhysicalPages > ((12*1024*1024)/PAGE_SIZE) ) { 00244 NumberOfCriticalThreads = MEDIUM_NUMBER_OF_THREADS; 00245 } else { 00246 NumberOfCriticalThreads = SMALL_NUMBER_OF_THREADS; 00247 } 00248 break; 00249 00250 case MmMediumSystem: 00251 NumberOfDelayedThreads = MEDIUM_NUMBER_OF_THREADS; 00252 NumberOfCriticalThreads = MEDIUM_NUMBER_OF_THREADS; 00253 if ( NtAs ) { 00254 NumberOfCriticalThreads += MEDIUM_NUMBER_OF_THREADS; 00255 } 00256 break; 00257 00258 case MmLargeSystem: 00259 NumberOfDelayedThreads = MEDIUM_NUMBER_OF_THREADS; 00260 NumberOfCriticalThreads = LARGE_NUMBER_OF_THREADS; 00261 if ( NtAs ) { 00262 NumberOfCriticalThreads += LARGE_NUMBER_OF_THREADS; 00263 } 00264 break; 00265 00266 default: 00267 NumberOfDelayedThreads = SMALL_NUMBER_OF_THREADS; 00268 NumberOfCriticalThreads = SMALL_NUMBER_OF_THREADS; 00269 } 00270 00271 00272 // 00273 // Initialize the work Queue objects. 00274 // 00275 00276 if ( ExpAdditionalCriticalWorkerThreads > MAX_ADDITIONAL_THREADS ) { 00277 ExpAdditionalCriticalWorkerThreads = MAX_ADDITIONAL_THREADS; 00278 } 00279 00280 if ( ExpAdditionalDelayedWorkerThreads > MAX_ADDITIONAL_THREADS ) { 00281 ExpAdditionalDelayedWorkerThreads = MAX_ADDITIONAL_THREADS; 00282 } 00283 00284 // 00285 // Initialize the ExWorkerQueue[] array. 00286 // 00287 00288 for (WorkQueueType = 0; WorkQueueType < MaximumWorkQueue; WorkQueueType++) { 00289 00290 RtlZeroMemory(&ExWorkerQueue[WorkQueueType], 00291 sizeof(EX_WORK_QUEUE)); 00292 00293 KeInitializeQueue(&ExWorkerQueue[WorkQueueType].WorkerQueue, 0); 00294 } 00295 00296 // 00297 // We only create dynamic threads for the critical work queue (note 00298 // this doesn't apply to dynamic threads created to break deadlocks.) 00299 // 00300 // The rationale is this: folks who use the delayed work queue are 00301 // not time critical, and the hypercritical queue is used rarely 00302 // by folks who are non-blocking. 00303 // 00304 00305 ExWorkerQueue[CriticalWorkQueue].MakeThreadsAsNecessary = TRUE; 00306 00307 // 00308 // Initialize the global thread set manager event. 00309 // 00310 00311 KeInitializeEvent(&ExThreadSetManagerEvent, 00312 SynchronizationEvent, 00313 FALSE); 00314 00315 // 00316 // Create the desired number of executive worker threads for each 00317 // of the work queues. 00318 // 00319 00320 InitializeObjectAttributes(&ObjectAttributes, NULL, 0, NULL, NULL); 00321 00322 // 00323 // Create any builtin critical and delayed worker threads. 00324 // 00325 00326 for (Index = 0; Index < (NumberOfCriticalThreads + ExpAdditionalCriticalWorkerThreads); Index += 1) { 00327 00328 // 00329 // Create a worker thread to service the critical work queue. 00330 // 00331 00332 Status = ExpCreateWorkerThread( CriticalWorkQueue, FALSE ); 00333 if (!NT_SUCCESS(Status)) { 00334 break; 00335 } 00336 ExCriticalWorkerThreads++; 00337 } 00338 00339 00340 for (Index = 0; Index < (NumberOfDelayedThreads + ExpAdditionalDelayedWorkerThreads); Index += 1) { 00341 00342 // 00343 // Create a worker thread to service the delayed work queue. 00344 // 00345 00346 Status = ExpCreateWorkerThread( DelayedWorkQueue, FALSE ); 00347 if (!NT_SUCCESS(Status)) { 00348 break; 00349 } 00350 00351 ExDelayedWorkerThreads++; 00352 } 00353 00354 Status = ExpCreateWorkerThread( HyperCriticalWorkQueue, FALSE ); 00355 00356 // 00357 // Create the worker thread set manager thread. 00358 // 00359 00360 Status = PsCreateSystemThread(&Thread, 00361 THREAD_ALL_ACCESS, 00362 &ObjectAttributes, 00363 0L, 00364 NULL, 00365 ExpWorkerThreadBalanceManager, 00366 NULL); 00367 if (NT_SUCCESS(Status)) { 00368 ZwClose( Thread ); 00369 } 00370 00371 return (BOOLEAN)NT_SUCCESS(Status); 00372 }

VOID ExpWorkerThread IN PVOID  StartContext  ) 
 

Definition at line 538 of file worker.c.

References _ETHREAD::ActiveImpersonationInfo, CriticalWorkQueue, DbgPrint, DYNAMIC_THREAD_TIMEOUT, DYNAMIC_WORKER_THREAD, _EX_WORK_QUEUE::DynamicThreadCount, ExWorkerQueue, FALSE, HyperCriticalWorkQueue, IoRemoteBootClient, _ETHREAD::IrpList, KeBugCheckEx(), KeRemoveQueue(), _KTHREAD::KernelApcDisable, KernelMode, KeSetKernelStackSwapEnable(), KPROCESSOR_MODE, List, MmIsThisAnNtAsSystem(), NULL, _WORK_QUEUE_ITEM::Parameter, PsGetCurrentThread, PWORKER_THREAD_ROUTINE, _ETHREAD::Tcb, TRUE, UserMode, WORK_QUEUE_TYPE, _EX_WORK_QUEUE::WorkerQueue, _WORK_QUEUE_ITEM::WorkerRoutine, and _EX_WORK_QUEUE::WorkItemsProcessed.

Referenced by ExpCreateWorkerThread().

00542 { 00543 00544 PLIST_ENTRY Entry; 00545 WORK_QUEUE_TYPE QueueType; 00546 PWORK_QUEUE_ITEM WorkItem; 00547 KPROCESSOR_MODE WaitMode; 00548 LARGE_INTEGER TimeoutValue; 00549 PLARGE_INTEGER Timeout; 00550 PETHREAD Thread; 00551 BOOLEAN DynamicThread; 00552 PEX_WORK_QUEUE WorkerQueue; 00553 PVOID WorkerRoutine; 00554 PVOID Parameter; 00555 00556 WaitMode = UserMode; 00557 00558 // 00559 // Set timeout value etc according to whether we are static or dynamic. 00560 // 00561 00562 if (((ULONG_PTR)StartContext & DYNAMIC_WORKER_THREAD) == 0) { 00563 00564 // 00565 // We are being created as a static thread. As such it will not 00566 // terminate, so there is no point in timing out waiting for a work 00567 // item. 00568 // 00569 00570 Timeout = NULL; 00571 00572 } else { 00573 00574 // 00575 // This is a dynamic worker thread. It has a non-infinite timeout 00576 // so that it can eventually terminate. 00577 // 00578 00579 TimeoutValue.QuadPart = -DYNAMIC_THREAD_TIMEOUT; 00580 Timeout = &TimeoutValue; 00581 } 00582 00583 Thread = PsGetCurrentThread(); 00584 00585 // 00586 // If the thread is a critical worker thread, then set the thread 00587 // priority to the lowest realtime level. Otherwise, set the base 00588 // thread priority to time critical. 00589 // 00590 00591 QueueType = (WORK_QUEUE_TYPE) 00592 ((ULONG_PTR)StartContext & ~DYNAMIC_WORKER_THREAD); 00593 00594 WorkerQueue = &ExWorkerQueue[QueueType]; 00595 00596 switch ( QueueType ) { 00597 00598 case HyperCriticalWorkQueue: 00599 00600 // 00601 // Always make stack for this thread resident 00602 // so that worker pool deadlock magic can run 00603 // even when what we are trying to do is inpage 00604 // the hyper critical worker thread's stack. 00605 // Without this fix, we hold the process lock 00606 // but this thread's stack can't come in, and 00607 // the deadlock detection cannot create new threads 00608 // to break the system deadlock. 00609 // 00610 00611 WaitMode = KernelMode; 00612 break; 00613 00614 case CriticalWorkQueue: 00615 if ( MmIsThisAnNtAsSystem() ) { 00616 WaitMode = KernelMode; 00617 } 00618 00619 break; 00620 } 00621 00622 #if defined(REMOTE_BOOT) 00623 // 00624 // In diskless NT scenarios ensure that the kernel stack of the worker 00625 // threads will not be swapped out. 00626 // 00627 00628 if (IoRemoteBootClient) { 00629 KeSetKernelStackSwapEnable(FALSE); 00630 } 00631 #endif // defined(REMOTE_BOOT) 00632 00633 // 00634 // Loop forever waiting for a work queue item, calling the processing 00635 // routine, and then waiting for another work queue item. 00636 // 00637 00638 do { 00639 00640 while (TRUE) { 00641 00642 // 00643 // Wait until something is put in the queue or until we time out. 00644 // 00645 // By specifying a wait mode of UserMode, the thread's kernel 00646 // stack is swappable. 00647 // 00648 00649 Entry = KeRemoveQueue(&WorkerQueue->WorkerQueue, 00650 WaitMode, 00651 Timeout); 00652 if ((ULONG_PTR)Entry != STATUS_TIMEOUT) { 00653 00654 // 00655 // This is a real work item, break out of the timeout loop 00656 // and go process. 00657 // 00658 00659 break; 00660 } 00661 00662 // 00663 // These things are known: 00664 // 00665 // - Static worker threads do not time out, so this is a dynamic 00666 // worker thread. 00667 // 00668 // - This thread has been waiting for a long time with nothing 00669 // to do. 00670 // 00671 00672 if (IsListEmpty( &Thread->IrpList ) == FALSE) { 00673 00674 // 00675 // There is still I/O pending, can't terminate yet. 00676 // 00677 00678 continue; 00679 } 00680 00681 // 00682 // This dynamic thread can be terminated. 00683 // 00684 00685 #if DBG 00686 DbgPrint("EXWORKER: Dynamic type %d thread no longer needed," 00687 " terminating.\n", 00688 QueueType ); 00689 #endif 00690 00691 InterlockedDecrement( 00692 &WorkerQueue->DynamicThreadCount ); 00693 00694 #if defined(REMOTE_BOOT) 00695 // 00696 // We will bugcheck if we terminate a thread with stack swapping 00697 // disabled. 00698 // 00699 00700 if (IoRemoteBootClient) { 00701 KeSetKernelStackSwapEnable(TRUE); 00702 } 00703 #endif // defined(REMOTE_BOOT) 00704 00705 return; 00706 } 00707 00708 // 00709 // Update the total number of work items processed. 00710 // 00711 00712 InterlockedIncrement( &WorkerQueue->WorkItemsProcessed ); 00713 00714 WorkItem = CONTAINING_RECORD(Entry, WORK_QUEUE_ITEM, List); 00715 WorkerRoutine = WorkItem->WorkerRoutine; 00716 Parameter = WorkItem->Parameter; 00717 00718 // 00719 // Execute the specified routine. 00720 // 00721 00722 #if DBG 00723 00724 try { 00725 00726 ((PWORKER_THREAD_ROUTINE)WorkerRoutine)(Parameter); 00727 00728 if (KeGetCurrentIrql() != 0) { 00729 DbgPrint("EXWORKER: worker exit at IRQL %d, worker routine %x, " 00730 "parameter %x, item %x\n", 00731 KeGetCurrentIrql(), WorkerRoutine, Parameter, WorkItem); 00732 00733 DbgBreakPoint(); 00734 } 00735 00736 // 00737 // Catch worker routines that forget to do KeLeaveCriticalRegion. 00738 // 00739 if (Thread->Tcb.KernelApcDisable != 0) { 00740 DbgPrint("EXWORKER: worker exit with APCs disabled, worker routine %x, " 00741 "parameter %x, item %x\n", 00742 WorkerRoutine, Parameter, WorkItem); 00743 00744 DbgBreakPoint(); 00745 Thread->Tcb.KernelApcDisable = 0; 00746 } 00747 00748 if (Thread->ActiveImpersonationInfo) { 00749 KeBugCheckEx( 00750 IMPERSONATING_WORKER_THREAD, 00751 (ULONG_PTR)WorkerRoutine, 00752 (ULONG_PTR)Parameter, 00753 (ULONG_PTR)WorkItem, 00754 0); 00755 } 00756 00757 } except( ExpWorkerThreadFilter(WorkerRoutine, 00758 Parameter, 00759 GetExceptionInformation() )) { 00760 } 00761 00762 #else 00763 00764 ((PWORKER_THREAD_ROUTINE)WorkerRoutine)(Parameter); 00765 00766 // 00767 // Catch worker routines that forget to do KeLeaveCriticalRegion. 00768 // It has to be zero at this point. In the debug case we enter a 00769 // breakpoint. In the non-debug case just zero the flag so that 00770 // APCs can continue to fire to this thread. 00771 // 00772 00773 if (Thread->Tcb.KernelApcDisable != 0) { 00774 DbgPrint("EXWORKER: worker exit with APCs disabled, worker routine %x, " 00775 "parameter %x, item %x\n", 00776 WorkerRoutine, Parameter, WorkItem); 00777 00778 Thread->Tcb.KernelApcDisable = 0; 00779 } 00780 00781 if (KeGetCurrentIrql() != 0) { 00782 KeBugCheckEx( 00783 WORKER_THREAD_RETURNED_AT_BAD_IRQL, 00784 (ULONG_PTR)WorkerRoutine, 00785 (ULONG_PTR)KeGetCurrentIrql(), 00786 (ULONG_PTR)Parameter, 00787 (ULONG_PTR)WorkItem 00788 ); 00789 } 00790 00791 if (Thread->ActiveImpersonationInfo) { 00792 KeBugCheckEx( 00793 IMPERSONATING_WORKER_THREAD, 00794 (ULONG_PTR)WorkerRoutine, 00795 (ULONG_PTR)Parameter, 00796 (ULONG_PTR)WorkItem, 00797 0 00798 ); 00799 } 00800 #endif 00801 00802 } while(TRUE); 00803 }

VOID ExpWorkerThreadBalanceManager IN PVOID  StartContext  ) 
 

Definition at line 431 of file worker.c.

References CRITICAL_WORK_QUEUE_PRIORITY, Executive, ExpCheckDynamicThreadCount(), ExpDetectWorkerThreadDeadlock(), ExThreadSetManagerEvent, FALSE, KeGetCurrentThread, KeInitializeTimer(), KernelMode, KeSetBasePriorityThread(), KeSetTimer(), KeWaitForMultipleObjects(), MaximumBalanceObject, NTSTATUS(), NULL, PAGED_CODE, Status, THREAD_SET_INTERVAL, ThreadSetManagerEvent, TimerExpiration, and TRUE.

Referenced by ExpWorkerInitialization().

00437 : 00438 00439 This function is the startup code for the worker thread manager thread. 00440 The worker thread manager thread is created during system initialization 00441 and begins execution in this function. 00442 00443 This thread is responsible for detecting and breaking circular deadlocks 00444 in the system worker thread queues. It will also create and destroy 00445 additional worker threads as needed based on loading. 00446 00447 Arguments: 00448 00449 Context - Supplies a pointer to an arbitrary data structure (NULL). 00450 00451 Return Value: 00452 00453 None. 00454 00455 --*/ 00456 { 00457 KTIMER PeriodTimer; 00458 LARGE_INTEGER DueTime; 00459 PVOID WaitObjects[MaximumBalanceObject]; 00460 NTSTATUS Status; 00461 00462 PAGED_CODE(); 00463 00464 // 00465 // Raise the thread priority to just higher than the priority of the 00466 // critical work queue. 00467 // 00468 00469 KeSetBasePriorityThread(KeGetCurrentThread(), 00470 CRITICAL_WORK_QUEUE_PRIORITY+1); 00471 00472 // 00473 // Initialize the periodic timer and set the manager period. 00474 // 00475 00476 KeInitializeTimer(&PeriodTimer); 00477 DueTime.QuadPart = - THREAD_SET_INTERVAL; 00478 00479 // 00480 // Initialize the wait object array. 00481 // 00482 00483 WaitObjects[TimerExpiration] = (PVOID)&PeriodTimer; 00484 WaitObjects[ThreadSetManagerEvent] = (PVOID)&ExThreadSetManagerEvent; 00485 00486 // 00487 // Loop forever processing events. 00488 // 00489 00490 while( TRUE ) { 00491 00492 // 00493 // Set the timer to expire at the next periodic interval. 00494 // 00495 00496 KeSetTimer(&PeriodTimer, DueTime, NULL); 00497 00498 // 00499 // Wake up when the timer expires or the set manager event is 00500 // signalled. 00501 // 00502 00503 Status = KeWaitForMultipleObjects(MaximumBalanceObject, 00504 WaitObjects, 00505 WaitAny, 00506 Executive, 00507 KernelMode, 00508 FALSE, 00509 NULL, 00510 NULL); 00511 00512 switch (Status) { 00513 00514 case TimerExpiration: 00515 00516 // 00517 // Periodic timer expiration - go see if any work queues 00518 // are deadlocked. 00519 // 00520 00521 ExpDetectWorkerThreadDeadlock(); 00522 break; 00523 00524 case ThreadSetManagerEvent: 00525 00526 // 00527 // Someone has asked us to check some metrics to determine 00528 // whether we should create another worker thread. 00529 // 00530 00531 ExpCheckDynamicThreadCount(); 00532 break; 00533 } 00534 } 00535 }

VOID ExQueueWorkItem IN PWORK_QUEUE_ITEM  WorkItem,
IN WORK_QUEUE_TYPE  QueueType
 

Definition at line 375 of file worker.c.

References ASSERT, ExpNewThreadNecessary(), ExThreadSetManagerEvent, ExWorkerQueue, FALSE, KeInsertQueue(), KeSetEvent(), MaximumWorkQueue, and NULL.

Referenced by CcPostWorkQueue(), CmpClaimGlobalQuota(), CmpDiskFullWarning(), CmpLazyFlushDpcRoutine(), CmpPostNotify(), IopChainDereferenceComplete(), IopCompleteUnloadOrDelete(), IopDeviceEjectComplete(), IopEjectDevice(), IopErrorLogDpc(), IopProcessNewProfile(), IopQueueDeviceWorkItem(), IopRequestDeviceAction(), IopSendMessageToTrackService(), IoQueueWorkItem(), IoRaiseHardError(), IoRaiseInformationalHardError(), IoReportTargetDeviceChangeAsynchronous(), IovpInternalDeferredCompletion(), IoWriteErrorLogEntry(), KdpTimeSlipDpcRoutine(), KdpTrap(), MiCheckForCrashDump(), NtLoadDriver(), NtUnloadDriver(), ObfDereferenceObject(), SepInformFileSystemsOfDeletedLogon(), SepQueueWorkItem(), SmbTraceDereferenceHeap(), UdfAddToWorkque(), and UdfQueueClose().

00382 : 00383 00384 This function inserts a work item into a work queue that is processed 00385 by a worker thread of the corresponding type. 00386 00387 Arguments: 00388 00389 WorkItem - Supplies a pointer to the work item to add the the queue. 00390 This structure must be located in NonPagedPool. The work item 00391 structure contains a doubly linked list entry, the address of a 00392 routine to call and a parameter to pass to that routine. 00393 00394 QueueType - Specifies the type of work queue that the work item 00395 should be placed in. 00396 00397 Return Value: 00398 00399 None 00400 00401 --*/ 00402 00403 { 00404 00405 ASSERT(QueueType < MaximumWorkQueue); 00406 ASSERT(WorkItem->List.Flink == NULL); 00407 00408 // 00409 // Insert the work item in the appropriate queue object. 00410 // 00411 00412 KeInsertQueue(&ExWorkerQueue[QueueType].WorkerQueue, &WorkItem->List); 00413 00414 // 00415 // Determine whether another thread should be created, and signal the 00416 // thread set balance manager if so. 00417 // 00418 00419 if (ExpNewThreadNecessary(QueueType) != FALSE) { 00420 00421 KeSetEvent( &ExThreadSetManagerEvent, 00422 0, 00423 FALSE ); 00424 } 00425 00426 return; 00427 }


Variable Documentation

ULONG ExCriticalWorkerThreads
 

Definition at line 95 of file worker.c.

Referenced by CcInitializeCacheManager(), and ExpWorkerInitialization().

ULONG ExDelayedWorkerThreads
 

Definition at line 96 of file worker.c.

Referenced by ExpWorkerInitialization().

ULONG ExpAdditionalCriticalWorkerThreads
 

Definition at line 92 of file worker.c.

Referenced by ExpWorkerInitialization().

ULONG ExpAdditionalDelayedWorkerThreads
 

Definition at line 93 of file worker.c.

Referenced by ExpWorkerInitialization().

KEVENT ExThreadSetManagerEvent
 

Definition at line 102 of file worker.c.

Referenced by ExpWorkerInitialization(), ExpWorkerThreadBalanceManager(), and ExQueueWorkItem().

EX_WORK_QUEUE ExWorkerQueue[MaximumWorkQueue]
 

Definition at line 86 of file worker.c.

Referenced by ExpCheckForWorker(), ExpCreateWorkerThread(), ExpDetectWorkerThreadDeadlock(), ExpNewThreadNecessary(), ExpWorkerInitialization(), ExpWorkerThread(), ExQueueWorkItem(), KeRemoveQueue(), and KeTerminateThread().


Generated on Sat May 15 19:46:09 2004 for test by doxygen 1.3.7