Scippy

    SCIP

    Solving Constraint Integer Programs

    tpi_tnycthrd.c
    Go to the documentation of this file.
    1/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
    2/* */
    3/* This file is part of the program and library */
    4/* SCIP --- Solving Constraint Integer Programs */
    5/* */
    6/* Copyright (c) 2002-2025 Zuse Institute Berlin (ZIB) */
    7/* */
    8/* Licensed under the Apache License, Version 2.0 (the "License"); */
    9/* you may not use this file except in compliance with the License. */
    10/* You may obtain a copy of the License at */
    11/* */
    12/* http://www.apache.org/licenses/LICENSE-2.0 */
    13/* */
    14/* Unless required by applicable law or agreed to in writing, software */
    15/* distributed under the License is distributed on an "AS IS" BASIS, */
    16/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */
    17/* See the License for the specific language governing permissions and */
    18/* limitations under the License. */
    19/* */
    20/* You should have received a copy of the Apache-2.0 license */
    21/* along with SCIP; see the file LICENSE. If not visit scipopt.org. */
    22/* */
    23/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
    24
    25/**@file tpi_tnycthrd.c
    26 * @ingroup TASKINTERFACE
    27 * @brief a TPI implementation using tinycthreads
    28 * @author Stephen J. Maher
    29 * @author Leona Gottwald
    30 * @author Marc Pfetsch
    31 */
    32
    33/*---+----1----+----2----+----3----+----4----+----5----+----6----+----7----+----8----+----9----+----0----+----1----+----2*/
    34
    35#include "tpi/tpi.h"
    37#include "tinycthread/tinycthread.h"
    38#include "scip/pub_message.h"
    39#include "scip/pub_misc.h"
    40
    41/* macros for direct access */
    42
    43/* lock */
    44#define SCIPtnyInitLock(lock) ( mtx_init((lock), mtx_plain) == thrd_success ? SCIP_OKAY : SCIP_ERROR )
    45#define SCIPtnyDestroyLock(lock) ( mtx_destroy(lock) )
    46#define SCIPtnyAcquireLock(lock) ( mtx_lock(lock) == thrd_success ? SCIP_OKAY : SCIP_ERROR )
    47#define SCIPtnyReleaseLock(lock) ( mtx_unlock(lock) == thrd_success ? SCIP_OKAY : SCIP_ERROR )
    48
    49/* condition */
    50#define SCIPtnyInitCondition(condition) ( cnd_init(condition) == thrd_success ? SCIP_OKAY : SCIP_ERROR )
    51#define SCIPtnyDestroyCondition(condition) ( cnd_destroy(condition) )
    52#define SCIPtnySignalCondition(condition) ( cnd_signal(condition) == thrd_success ? SCIP_OKAY : SCIP_ERROR )
    53#define SCIPtnyBroadcastCondition(condition) ( cnd_broadcast(condition) == thrd_success ? SCIP_OKAY : SCIP_ERROR )
    54#define SCIPtnyWaitCondition(condition, lock) ( cnd_wait((condition), (lock)) == thrd_success ? SCIP_OKAY: SCIP_ERROR )
    55
    56/** struct containing lock */
    57struct SCIP_Lock
    58{
    59 mtx_t lock;
    60};
    61
    62/** struct containing condition */
    63struct SCIP_Condition
    64{
    65 cnd_t condition;
    66};
    67
    68
    70static SCIP_THREADPOOL* _threadpool = NULL;
    71_Thread_local int _threadnumber; /*lint !e129*/
    72
    73/** A job added to the queue */
    74struct SCIP_Job
    75{
    76 int jobid; /**< id to identify jobs from a common process */
    77 struct SCIP_Job* nextjob; /**< pointer to the next job in the queue */
    78 SCIP_RETCODE (*jobfunc)(void* args);/**< pointer to the job function */
    79 void* args; /**< pointer to the function arguments */
    80 SCIP_RETCODE retcode; /**< return code of the job */
    81};
    82
    83/** the thread pool job queue */
    84struct SCIP_JobQueue
    85{
    86 SCIP_JOB* firstjob; /**< pointer to the first job in the queue */
    87 SCIP_JOB* lastjob; /**< pointer to the last job in the queue */
    88 int njobs; /**< number of jobs in the queue */
    89};
    91
    92/** The thread pool */
    94{
    95 /* Pool Characteristics */
    96 int nthreads; /**< number of threads in the pool */
    97 int queuesize; /**< the total number of items to enter the queue */
    98
    99 /* Current pool state */
    100 thrd_t* threads; /**< the threads included in the pool */
    101 SCIP_JOBQUEUE* jobqueue; /**< the job queue */
    102 SCIP_JOBQUEUE* currentjobs; /**< the jobs currently being processed on a thread;
    103 * only a single job is allowed per thread. */
    104 SCIP_JOBQUEUE* finishedjobs; /**< finished jobs that are not yet collected */
    105 int currworkingthreads; /**< the threads currently processing jobs */
    106 SCIP_Bool blockwhenfull; /**< indicates that the queue can only be as large as nthreads */
    107 int currentid; /**< current job id */
    108
    109 /* Control indicators */
    110 SCIP_Bool shutdown; /**< indicates whether the pool needs to be shut down */
    111 SCIP_Bool queueopen; /**< indicates whether the queue is open */
    112
    113 /* mutex and locks for the thread pool */
    114 mtx_t poollock; /**< mutex to allow read and write of the pool features */
    115 cnd_t queuenotempty; /**< condition to broadcast the queue has jobs */
    116 cnd_t queuenotfull; /**< condition to broadcast the queue is not full */
    117 cnd_t queueempty; /**< condition to broadcast that the queue is empty */
    118 cnd_t jobfinished; /**< condition to broadcast that a job has been finished */
    119};
    120
    121/** this function controls the execution of each of the threads */
    122static
    124 void* threadnum /**< thread number is passed in as argument stored inside a void pointer */
    125 )
    126{
    127 SCIP_JOB* newjob;
    128 SCIP_JOB* prevjob;
    129 SCIP_JOB* currjob;
    130
    131 _threadnumber = (int)(uintptr_t) threadnum;
    132
    133 /* Increase the number of active threads */
    134 SCIP_CALL( SCIPtnyAcquireLock(&(_threadpool->poollock)) );
    135 _threadpool->currworkingthreads += 1;
    136 SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
    137
    138 /* this is an endless loop that runs until the thrd_exit function is called */
    139 while( TRUE ) /*lint !e716*/
    140 {
    141 SCIP_CALL( SCIPtnyAcquireLock(&(_threadpool->poollock)) );
    142
    143 /* the queue is empty but the shutdown command has not been given */
    144 while( _threadpool->jobqueue->njobs == 0 && !_threadpool->shutdown )
    145 {
    146 SCIP_CALL( SCIPtnyWaitCondition(&(_threadpool->queuenotempty), &(_threadpool->poollock)) );
    147 }
    148
    149 /* if the shutdown command has been given, then exit the thread */
    150 if( _threadpool->shutdown )
    151 {
    152 /* Decrease the thread count when execution of job queue has completed */
    153 _threadpool->currworkingthreads -= 1;
    154 SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
    155
    156 thrd_exit((int)SCIP_OKAY);
    157 }
    158
    159 /* getting the next job in the queue */
    160 newjob = _threadpool->jobqueue->firstjob;
    161 _threadpool->jobqueue->njobs--; /* decreasing the number of jobs in the queue */
    162
    163 if( _threadpool->jobqueue->njobs == 0 )
    164 {
    165 _threadpool->jobqueue->firstjob = NULL;
    166 _threadpool->jobqueue->lastjob = NULL;
    167 }
    168 else
    169 _threadpool->jobqueue->firstjob = newjob->nextjob; /* updating the queue */
    170
    171 /* if we want to wait when the queue is full, then we broadcast that the queue can now take new jobs */
    172 if( _threadpool->blockwhenfull &&
    173 _threadpool->jobqueue->njobs == _threadpool->queuesize - 1 )
    174 {
    176 }
    177
    178 /* indicating that the queue is empty */
    179 if( _threadpool->jobqueue->njobs == 0 )
    180 {
    182 }
    183
    184 /* updating the current job list */
    185 if( _threadpool->currentjobs->njobs == 0 )
    186 {
    187 _threadpool->currentjobs->firstjob = newjob;
    188 _threadpool->currentjobs->lastjob = newjob;
    189 }
    190 else
    191 {
    192 _threadpool->currentjobs->lastjob->nextjob = newjob;
    193 _threadpool->currentjobs->lastjob = newjob;
    194 }
    195
    196 _threadpool->currentjobs->njobs++;
    197
    198 SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
    199
    200 /* setting the job to run on this thread */
    201 newjob->retcode = (*(newjob->jobfunc))(newjob->args);
    202
    203 /* setting the current job on this thread to NULL */
    204 SCIP_CALL( SCIPtnyAcquireLock(&(_threadpool->poollock)) );
    205
    206 /* finding the location of the processed job in the currentjobs queue */
    207 currjob = _threadpool->currentjobs->firstjob;
    208 prevjob = NULL;
    209
    210 while( currjob != newjob )
    211 {
    212 prevjob = currjob;
    213 currjob = prevjob->nextjob;
    214 }
    215
    216 /* removing the processed job from current jobs list */
    217 if( currjob == _threadpool->currentjobs->firstjob )
    218 _threadpool->currentjobs->firstjob = currjob->nextjob;
    219 else
    220 prevjob->nextjob = currjob->nextjob; /*lint !e794*/
    221
    222 if( currjob == _threadpool->currentjobs->lastjob )
    223 _threadpool->currentjobs->lastjob = prevjob;
    224
    225 _threadpool->currentjobs->njobs--;
    226
    227 /* updating the finished job list */
    228 if( _threadpool->finishedjobs->njobs == 0 )
    229 {
    230 _threadpool->finishedjobs->firstjob = newjob;
    231 _threadpool->finishedjobs->lastjob = newjob;
    232 }
    233 else
    234 {
    235 _threadpool->finishedjobs->lastjob->nextjob = newjob;
    236 _threadpool->finishedjobs->lastjob = newjob;
    237 }
    238
    239 _threadpool->finishedjobs->njobs++;
    240
    241 /* signalling that a job has been finished */
    242 SCIP_CALL( SCIPtnyBroadcastCondition(&(_threadpool)->jobfinished) );
    243
    244 SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
    245 }
    246}
    247
    248/** this function controls the execution of each of the threads */
    249static
    251 void* threadnum /**< thread number is passed in as argument stored inside a void pointer */
    252 )
    253{
    254 return (int) threadPoolThreadRetcode(threadnum);
    255}
    256
    257/** creates a threadpool */
    258static
    260 SCIP_THREADPOOL** thrdpool, /**< pointer to store threadpool */
    261 int nthreads, /**< number of threads in the threadpool */
    262 int qsize, /**< maximum size of the jobqueue */
    263 SCIP_Bool blockwhenfull /**< should the jobqueue block if it is full */
    264 )
    265{
    266 uintptr_t i;
    267
    268 assert(nthreads >= 0);
    269 assert(qsize >= 0);
    270
    271 /* @todo think about the correct memory here */
    272 SCIP_ALLOC( BMSallocMemory(thrdpool) );
    273 (*thrdpool)->currentid = 0;
    274 (*thrdpool)->queuesize = qsize;
    275 (*thrdpool)->nthreads = nthreads;
    276 (*thrdpool)->blockwhenfull = blockwhenfull;
    277 (*thrdpool)->shutdown = FALSE;
    278 (*thrdpool)->queueopen = TRUE;
    279
    280 /* allocating memory for the job queue */
    281 SCIP_ALLOC( BMSallocMemory(&(*thrdpool)->jobqueue) );
    282 (*thrdpool)->jobqueue->firstjob = NULL;
    283 (*thrdpool)->jobqueue->lastjob = NULL;
    284 (*thrdpool)->jobqueue->njobs = 0;
    285
    286 /* allocating memory for the job queue */
    287 SCIP_ALLOC( BMSallocMemory(&(*thrdpool)->currentjobs) );
    288 (*thrdpool)->currentjobs->firstjob = NULL;
    289 (*thrdpool)->currentjobs->lastjob = NULL;
    290 (*thrdpool)->currentjobs->njobs = 0;
    291
    292 /* allocating memory for the job queue */
    293 SCIP_ALLOC( BMSallocMemory(&(*thrdpool)->finishedjobs) );
    294 (*thrdpool)->finishedjobs->firstjob = NULL;
    295 (*thrdpool)->finishedjobs->lastjob = NULL;
    296 (*thrdpool)->finishedjobs->njobs = 0;
    297
    298 /* initialising the mutex */
    299 SCIP_CALL( SCIPtnyInitLock(&(*thrdpool)->poollock) ); /*lint !e2482*/
    300
    301 /* initialising the conditions */
    302 SCIP_CALL( SCIPtnyInitCondition(&(*thrdpool)->queuenotempty) );
    303 SCIP_CALL( SCIPtnyInitCondition(&(*thrdpool)->queuenotfull) );
    304 SCIP_CALL( SCIPtnyInitCondition(&(*thrdpool)->queueempty) );
    305 SCIP_CALL( SCIPtnyInitCondition(&(*thrdpool)->jobfinished) );
    306
    307 /* creating the threads */
    308 (*thrdpool)->currworkingthreads = 0;
    309
    310 /* allocating memory for the threads */
    311 SCIP_ALLOC( BMSallocMemoryArray(&((*thrdpool)->threads), nthreads) );
    312
    313 /* create the threads */
    314 for( i = 0; i < (unsigned)nthreads; i++ )
    315 {
    316 if( thrd_create(&((*thrdpool)->threads[i]), threadPoolThread, (void*)i) != thrd_success )
    317 return SCIP_ERROR;
    318 }
    319
    320 _threadnumber = nthreads;
    321 /* halt while all threads are not active TODO: is synchronization required here ? */
    322 /*TODO: this caused a deadlock, is it important to wait for all threads to start?
    323 * while( (*thrdpool)->currworkingthreads != nthreads )
    324 {}*/
    325
    326 return SCIP_OKAY;
    327}
    328
    329/** adding a job to the job queue.
    330 *
    331 * This gives some more flexibility in the handling of new jobs.
    332 * This function needs to be called from within a mutex.
    333 */
    334static
    336 SCIP_THREADPOOL* threadpool, /**< pointer to store threadpool */
    337 SCIP_JOB* newjob /**< pointer to new job */
    338 )
    339{
    340 /* @todo we want to work out what to do with a full job queue. Is there a problem if the limit is hit? */
    341 /* @note it is important to have a queuesize. This will stop the code submitting infinitely many jobs. */
    342 assert(threadpool->jobqueue->njobs < threadpool->queuesize);
    343
    344 newjob->nextjob = NULL;
    345
    346 /* checking the status of the job queue */
    347 if( threadpool->jobqueue->njobs == 0 )
    348 {
    349 threadpool->jobqueue->firstjob = newjob;
    350 threadpool->jobqueue->lastjob = newjob;
    351 }
    352 else /* it is assumed that the jobqueue is not full */
    353 {
    354 threadpool->jobqueue->lastjob->nextjob = newjob;
    355 threadpool->jobqueue->lastjob = newjob;
    356 }
    357
    358 /* signalling to all threads that the queue has jobs using the signal instead of broadcast because only one thread
    359 * should be awakened */
    361
    362 threadpool->jobqueue->njobs++;
    363}
    364
    365/** adds a job to the threadpool */
    366static
    368 SCIP_JOB* newjob, /**< job to add to threadpool */
    369 SCIP_SUBMITSTATUS* status /**< pointer to store the job's submit status */
    370 )
    371{
    372 assert(newjob != NULL);
    373 assert(_threadpool != NULL);
    374
    375 SCIP_CALL( SCIPtnyAcquireLock(&(_threadpool->poollock)) );
    376
    377 /* if the queue is full and we are blocking, then return an error. */
    378 if( _threadpool->jobqueue->njobs == _threadpool->queuesize && _threadpool->blockwhenfull )
    379 {
    380 SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
    381 *status = SCIP_SUBMIT_QUEUEFULL;
    382 return SCIP_OKAY;
    383 }
    384
    385 /* Wait until the job queue is not full. If the queue is closed or the thread pool is shut down, then stop waiting. */
    386 /* @todo this needs to be checked. It is possible that a job can be submitted and then the queue is closed or the
    387 * thread pool is shut down. Need to work out the best way to handle this. */
    388 while( _threadpool->jobqueue->njobs == _threadpool->queuesize && !(_threadpool->shutdown || !_threadpool->queueopen) )
    389 {
    390 SCIP_CALL( SCIPtnyWaitCondition(&(_threadpool->queuenotfull), &(_threadpool->poollock)) );
    391 }
    392
    393 /* if the thread pool is shut down or the queue is closed, then we need to leave the job submission */
    394 if( !_threadpool->queueopen )
    395 {
    396 SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
    397 *status = SCIP_SUBMIT_QUEUECLOSED;
    398 return SCIP_OKAY;
    399 }
    400 else if( _threadpool->shutdown )
    401 {
    402 SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
    403 *status = SCIP_SUBMIT_SHUTDOWN;
    404 return SCIP_OKAY;
    405 }
    406
    407 /* creating the job for submission */
    408 newjob->nextjob = NULL;
    409
    410 /* adding the job to the queue */
    411 /* this can only happen if the queue is not full */
    412 assert(_threadpool->jobqueue->njobs != _threadpool->queuesize);
    413 jobQueueAddJob(_threadpool, newjob);
    414
    415 SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
    416
    417 *status = SCIP_SUBMIT_SUCCESS;
    418
    419 return SCIP_OKAY;
    420}
    421
    422/** frees the jobqueue of the threadpool */
    423static
    425 SCIP_THREADPOOL* thrdpool /**< pointer to thread pool */
    426 )
    427{
    428 SCIP_JOB* currjob;
    429
    430 assert(!thrdpool->queueopen);
    431 assert(thrdpool->shutdown);
    432
    433 /* iterating through all jobs until all have been freed */
    434 while( thrdpool->jobqueue->firstjob != NULL )
    435 {
    436 currjob = thrdpool->jobqueue->firstjob->nextjob;
    437 thrdpool->jobqueue->firstjob = thrdpool->jobqueue->firstjob->nextjob;
    438 BMSfreeMemory(&currjob);
    439 }
    440
    441 assert(thrdpool->jobqueue->firstjob == NULL);
    442 assert(thrdpool->jobqueue->lastjob == NULL);
    443
    444 BMSfreeMemory(&thrdpool->jobqueue);
    445}
    446
    447/** free the thread pool */
    448static
    450 SCIP_THREADPOOL** thrdpool, /**< pointer to thread pool */
    451 SCIP_Bool finishjobs, /**< currently unused */
    452 SCIP_Bool completequeue /**< Wait until the queue has complete? */
    453 )
    454{
    455 int i;
    456 SCIP_RETCODE retcode;
    457
    458 /*TODO remove argument? */
    459 SCIP_UNUSED( finishjobs );
    460
    461 SCIP_CALL( SCIPtnyAcquireLock(&((*thrdpool)->poollock)) );
    462
    463 /* if the shutdown is already in progress, then we don't need to complete this function */
    464 if( !(*thrdpool)->queueopen || (*thrdpool)->shutdown )
    465 {
    466 SCIP_CALL( SCIPtnyReleaseLock(&((*thrdpool)->poollock)) );
    467
    468 return SCIP_OKAY;
    469 }
    470
    471 /* indicating that the job queue is now closed for new jobs */
    472 (*thrdpool)->queueopen = FALSE;
    473
    474 /* if the jobs in the queue should be completed, then we wait until the queueempty condition is set */
    475 if( completequeue )
    476 {
    477 while( (*thrdpool)->jobqueue->njobs > 0 )
    478 {
    479 SCIP_CALL( SCIPtnyWaitCondition(&((*thrdpool)->queueempty), &((*thrdpool)->poollock)) );
    480 }
    481 }
    482
    483 /* indicating that the tpi has commenced the shutdown process */
    484 (*thrdpool)->shutdown = TRUE;
    485
    486 SCIP_CALL( SCIPtnyReleaseLock(&((*thrdpool)->poollock)) );
    487
    488 /* waking up all threads so that they can check the shutdown condition;
    489 * this requires that the conditions queuenotempty and queuenotfull is broadcast
    490 */
    491 SCIP_CALL( SCIPtnyBroadcastCondition(&((*thrdpool)->queuenotempty)) );
    492 SCIP_CALL( SCIPtnyBroadcastCondition(&((*thrdpool)->queuenotfull)) );
    493
    494 retcode = SCIP_OKAY;
    495
    496 /* calling a join to ensure that all worker finish before the thread pool is closed */
    497 for( i = 0; i < (*thrdpool)->nthreads; i++ )
    498 {
    499 int thrdretcode;
    500
    501 if( thrd_join((*thrdpool)->threads[i], &thrdretcode) != thrd_success )
    502 retcode = (SCIP_RETCODE) MIN((int)SCIP_ERROR, (int)retcode);
    503 else
    504 retcode = (SCIP_RETCODE) MIN(thrdretcode, (int)retcode);
    505 }
    506
    507 /* freeing memory and data structures */
    508 BMSfreeMemoryArray(&(*thrdpool)->threads);
    509
    510 /* Freeing the current jobs list. This assumes that all jobs complete before the tpi is closed. */
    511 assert((*thrdpool)->currentjobs->njobs == 0);
    512 BMSfreeMemory(&(*thrdpool)->currentjobs);
    513 assert((*thrdpool)->finishedjobs->njobs == 0);
    514 BMSfreeMemory(&(*thrdpool)->finishedjobs);
    515
    516 freeJobQueue(*thrdpool);
    517
    518 /* destroying the conditions */
    519 SCIPtnyDestroyCondition(&(*thrdpool)->jobfinished);
    520 SCIPtnyDestroyCondition(&(*thrdpool)->queueempty);
    521 SCIPtnyDestroyCondition(&(*thrdpool)->queuenotfull);
    522 SCIPtnyDestroyCondition(&(*thrdpool)->queuenotempty);
    523
    524 /* destroying the mutex */
    525 SCIPtnyDestroyLock(&(*thrdpool)->poollock);
    526
    527 BMSfreeMemory(thrdpool);
    528
    529 return retcode;
    530}
    531
    532
    533/* checking a job queue */
    534static
    536 SCIP_JOBQUEUE* jobqueue, /**< pointer to the job queue */
    537 int jobid /**< id of job to check */
    538 )
    539{
    540 SCIP_JOB* currjob = jobqueue->firstjob;
    541
    542 /* checking the job ids */
    543 if( currjob != NULL )
    544 {
    545 while( currjob != jobqueue->lastjob )
    546 {
    547 if( currjob->jobid == jobid )
    548 return SCIP_JOB_INQUEUE;
    549
    550 currjob = currjob->nextjob;
    551 }
    552
    553 if( currjob->jobid == jobid )
    554 return SCIP_JOB_INQUEUE;
    555 }
    556
    558}
    559
    560/** returns whether the job id is running */
    561static
    563 SCIP_JOBQUEUE* currentjobs, /**< queue of current jobs */
    564 int jobid /**< id of job to check */
    565 )
    566{
    567 if( checkJobQueue(currentjobs, jobid) == SCIP_JOB_INQUEUE )
    568 return TRUE;
    569 else
    570 return FALSE;
    571}
    572
    573/** returns the number of threads */
    575 void
    576 )
    577{
    578 return _threadpool != NULL ? _threadpool->nthreads : 0;
    579}
    580
    581/** initializes tpi */
    583 int nthreads, /**< the number of threads to be used */
    584 int queuesize, /**< the size of the queue */
    585 SCIP_Bool blockwhenfull /**< should the queue block when full */
    586 )
    587{
    588 assert(_threadpool == NULL);
    589 SCIP_CALL( createThreadPool(&_threadpool, nthreads, queuesize, blockwhenfull) );
    590 return SCIP_OKAY;
    591}
    592
    593/** deinitializes tpi */
    595 void
    596 )
    597{
    598 assert(_threadpool != NULL);
    599
    600 SCIP_CALL( freeThreadPool(&_threadpool, TRUE, TRUE) );
    601
    602 return SCIP_OKAY;
    603}
    604
    605/** creates a job for parallel processing */
    607 SCIP_JOB** job, /**< pointer to the job that will be created */
    608 int jobid, /**< the id for the current job */
    609 SCIP_RETCODE (*jobfunc)(void* args),/**< pointer to the job function */
    610 void* jobarg /**< the job's argument */
    611 )
    612{
    614
    615 (*job)->jobid = jobid;
    616 (*job)->jobfunc = jobfunc;
    617 (*job)->args = jobarg;
    618 (*job)->nextjob = NULL;
    619
    620 return SCIP_OKAY;
    621}
    622
    623/** get a new job id for the new set of submitted jobs */
    625 void
    626 )
    627{
    628 int id;
    629 assert(_threadpool != NULL);
    630
    632 id = ++_threadpool->currentid;
    634
    635 return id;
    636}
    637
    638/** submit a job for parallel processing; the return value is a globally defined status */
    640 SCIP_JOB* job, /**< pointer to the job to be submitted */
    641 SCIP_SUBMITSTATUS* status /**< pointer to store the job's submit status */
    642 )
    643{
    644 assert(job != NULL);
    645
    646 /* the job id must be set before submitting the job. The submitter controls whether a new id is required. */
    647 assert(job->jobid == _threadpool->currentid);
    648 SCIP_CALL( threadPoolAddWork(job, status) );
    649
    650 return SCIP_OKAY;
    651}
    652
    653/** blocks until all jobs of the given jobid have finished
    654 * and then returns the smallest SCIP_RETCODE of all the jobs
    655 */
    657 int jobid /**< the jobid of the jobs to wait for */
    658 )
    659{
    660 SCIP_RETCODE retcode;
    661 SCIP_JOB* currjob;
    662 SCIP_JOB* prevjob;
    663
    664 SCIP_CALL( SCIPtnyAcquireLock(&(_threadpool->poollock)) );
    665
    666 while( isJobRunning(_threadpool->currentjobs, jobid) || isJobRunning(_threadpool->jobqueue, jobid) )
    667 {
    668 SCIP_CALL( SCIPtnyWaitCondition(&_threadpool->jobfinished, &_threadpool->poollock) );
    669 }
    670
    671 /* finding the location of the processed job in the currentjobs queue */
    672 retcode = SCIP_OKAY;
    673 currjob = _threadpool->finishedjobs->firstjob;
    674 prevjob = NULL;
    675
    676 while( currjob )
    677 {
    678 if( currjob->jobid == jobid )
    679 {
    680 SCIP_JOB* nextjob;
    681
    682 /* if the job has the right jobid collect its retcode,
    683 * remove it from the finished job list, and free it
    684 */
    685 retcode = MIN(retcode, currjob->retcode);
    686
    687 /* removing the finished job from finished jobs list */
    688 if( currjob == _threadpool->finishedjobs->firstjob )
    689 {
    690 _threadpool->finishedjobs->firstjob = currjob->nextjob;
    691 }
    692 else
    693 {
    694 assert(prevjob != NULL);
    695 prevjob->nextjob = currjob->nextjob;
    696 }
    697
    698 if( currjob == _threadpool->finishedjobs->lastjob )
    699 _threadpool->finishedjobs->lastjob = prevjob;
    700
    701 _threadpool->finishedjobs->njobs--;
    702
    703 /* update currjob and free finished job; prevjob stays the same */
    704 nextjob = currjob->nextjob;
    705 BMSfreeMemory(&currjob);
    706 currjob = nextjob;
    707 }
    708 else
    709 {
    710 /* otherwise leave job untouched */
    711 prevjob = currjob;
    712 currjob = prevjob->nextjob;
    713 }
    714 }
    715
    716 SCIP_CALL( SCIPtnyReleaseLock(&_threadpool->poollock) );
    717
    718 return retcode;
    719}
    720
    721
    722/*
    723 * locks
    724 */
    725
    726/** initializes the given lock */
    728 SCIP_LOCK** lock /**< the lock */
    729 )
    730{
    731 assert(lock != NULL);
    732
    733 SCIP_ALLOC( BMSallocMemory(lock) );
    734
    735 if( mtx_init(&(*lock)->lock, mtx_plain) == thrd_success )
    736 return SCIP_OKAY;
    737 else
    738 {
    739 BMSfreeMemory(lock);
    740 return SCIP_ERROR;
    741 }
    742}
    743
    744/** destroys the given lock */
    746 SCIP_LOCK** lock /**< the lock */
    747 )
    748{
    749 assert(lock != NULL);
    750
    751 mtx_destroy(&(*lock)->lock);
    752 BMSfreeMemory(lock);
    753}
    754
    755/** acquires the given lock */
    757 SCIP_LOCK* lock /**< the lock */
    758 )
    759{
    760 if( mtx_lock(&lock->lock) == thrd_success )
    761 return SCIP_OKAY;
    762 return SCIP_ERROR;
    763}
    764
    765/** releases the given lock */
    767 SCIP_LOCK* lock /**< the lock */
    768 )
    769{
    770 if( mtx_unlock(&lock->lock) == thrd_success )
    771 return SCIP_OKAY;
    772 return SCIP_ERROR;
    773}
    774
    775
    776/*
    777 * conditions
    778 */
    779
    780/** initializes the given condition variable */
    782 SCIP_CONDITION** condition /**< condition to be created and initialized */
    783 )
    784{
    785 assert(condition != NULL);
    786
    787 SCIP_ALLOC( BMSallocMemory(condition) );
    788
    789 if( cnd_init(&(*condition)->condition) == thrd_success )
    790 return SCIP_OKAY;
    791 return SCIP_ERROR;
    792}
    793
    794/** destroys the given condition variable */
    796 SCIP_CONDITION** condition /**< condition to be destroyed and freed */
    797 )
    798{
    799 cnd_destroy(&(*condition)->condition);
    800 BMSfreeMemory(condition);
    801}
    802
    803/** signals one waiting thread */
    805 SCIP_CONDITION* condition /**< the condition variable to signal */
    806 )
    807{
    808 if( cnd_signal(&condition->condition) == thrd_success )
    809 return SCIP_OKAY;
    810 return SCIP_ERROR;
    811}
    812
    813/** signals all waiting threads */
    814SCIP_EXPORT
    816 SCIP_CONDITION* condition /**< the condition variable to broadcast */
    817 )
    818{
    819 if( cnd_broadcast(&condition->condition) == thrd_success )
    820 return SCIP_OKAY;
    821 return SCIP_ERROR;
    822}
    823
    824/** waits on a condition variable. The given lock must be held by the caller and will
    825 * be held when this function returns.
    826 */
    828 SCIP_CONDITION* condition, /**< the condition variable to wait on */
    829 SCIP_LOCK* lock /**< the lock that is held by the caller */
    830 )
    831{
    832 if( cnd_wait(&condition->condition, &lock->lock) == thrd_success )
    833 return SCIP_OKAY;
    834 return SCIP_ERROR;
    835}
    836
    837/** returns the thread number */
    839 void
    840 )
    841{
    842 return _threadnumber;
    843}
    844
    845/** indicate whether a working TPI is available */
    847{
    848 return TRUE;
    849}
    850
    851/** get name of library that the TPI interfaces to */
    853 char* name, /**< buffer to store name */
    854 int namesize /**< length of name buffer */
    855 )
    856{
    857 assert(name != NULL);
    858
    859 (void) SCIPsnprintf(name, namesize, "TinyCThread %d.%d", TINYCTHREAD_VERSION_MAJOR, TINYCTHREAD_VERSION_MINOR);
    860}
    861
    862/** get description of library that the TPI interfaces to */
    864 char* desc, /**< buffer to store description */
    865 int descsize /**< length of description */
    866 )
    867{
    868 assert(desc != NULL);
    869
    870 (void) SCIPsnprintf(desc, descsize, "small portable implementation of the C11 threads API (tinycthread.github.io)");
    871}
    #define NULL
    Definition: def.h:248
    #define SCIP_UNUSED(x)
    Definition: def.h:409
    #define SCIP_Bool
    Definition: def.h:91
    #define MIN(x, y)
    Definition: def.h:224
    #define SCIP_ALLOC(x)
    Definition: def.h:366
    #define TRUE
    Definition: def.h:93
    #define FALSE
    Definition: def.h:94
    #define SCIP_CALL_ABORT(x)
    Definition: def.h:334
    #define SCIP_CALL(x)
    Definition: def.h:355
    int SCIPsnprintf(char *t, int len, const char *s,...)
    Definition: misc.c:10827
    memory allocation routines
    #define BMSfreeMemory(ptr)
    Definition: memory.h:145
    #define BMSallocMemoryArray(ptr, num)
    Definition: memory.h:123
    #define BMSfreeMemoryArray(ptr)
    Definition: memory.h:147
    #define BMSallocMemory(ptr)
    Definition: memory.h:118
    public methods for message output
    public data structures and miscellaneous methods
    SCIP_JOB * lastjob
    Definition: tpi_openmp.c:85
    SCIP_JOB * firstjob
    Definition: tpi_openmp.c:84
    SCIP_RETCODE retcode
    Definition: tpi_openmp.c:78
    struct SCIP_Job * nextjob
    Definition: tpi_openmp.c:75
    void * args
    Definition: tpi_openmp.c:77
    SCIP_RETCODE(* jobfunc)(void *args)
    Definition: tpi_openmp.c:76
    int jobid
    Definition: tpi_openmp.c:74
    mtx_t lock
    Definition: tpi_tnycthrd.c:59
    omp_lock_t lock
    Definition: tpi_openmp.c:58
    SCIP_Bool queueopen
    Definition: tpi_tnycthrd.c:111
    SCIP_JOBQUEUE * jobqueue
    Definition: tpi_tnycthrd.c:101
    SCIP_Bool shutdown
    Definition: tpi_tnycthrd.c:110
    SCIP_Bool blockwhenfull
    Definition: tpi_tnycthrd.c:106
    SCIP_JOBQUEUE * currentjobs
    Definition: tpi_tnycthrd.c:102
    thrd_t * threads
    Definition: tpi_tnycthrd.c:100
    SCIP_JOBQUEUE * finishedjobs
    Definition: tpi_tnycthrd.c:104
    the type definitions for the SCIP parallel interface
    static SCIP_RETCODE threadPoolAddWork(SCIP_JOB *newjob, SCIP_SUBMITSTATUS *status)
    Definition: tpi_tnycthrd.c:367
    static SCIP_JOBSTATUS checkJobQueue(SCIP_JOBQUEUE *jobqueue, int jobid)
    Definition: tpi_tnycthrd.c:535
    SCIP_Bool SCIPtpiIsAvailable(void)
    Definition: tpi_tnycthrd.c:846
    SCIP_RETCODE SCIPtpiWaitCondition(SCIP_CONDITION *condition, SCIP_LOCK *lock)
    Definition: tpi_tnycthrd.c:827
    SCIP_RETCODE SCIPtpiCreateJob(SCIP_JOB **job, int jobid, SCIP_RETCODE(*jobfunc)(void *args), void *jobarg)
    Definition: tpi_tnycthrd.c:606
    static SCIP_RETCODE threadPoolThreadRetcode(void *threadnum)
    Definition: tpi_tnycthrd.c:123
    SCIP_RETCODE SCIPtpiSignalCondition(SCIP_CONDITION *condition)
    Definition: tpi_tnycthrd.c:804
    SCIP_RETCODE SCIPtpiAcquireLock(SCIP_LOCK *lock)
    Definition: tpi_tnycthrd.c:756
    #define SCIPtnyInitCondition(condition)
    Definition: tpi_tnycthrd.c:50
    static SCIP_RETCODE createThreadPool(SCIP_THREADPOOL **thrdpool, int nthreads, int qsize, SCIP_Bool blockwhenfull)
    Definition: tpi_tnycthrd.c:259
    static void jobQueueAddJob(SCIP_THREADPOOL *threadpool, SCIP_JOB *newjob)
    Definition: tpi_tnycthrd.c:335
    SCIP_RETCODE SCIPtpiExit(void)
    Definition: tpi_tnycthrd.c:594
    #define SCIPtnyInitLock(lock)
    Definition: tpi_tnycthrd.c:44
    SCIP_RETCODE SCIPtpiBroadcastCondition(SCIP_CONDITION *condition)
    Definition: tpi_tnycthrd.c:815
    #define SCIPtnyBroadcastCondition(condition)
    Definition: tpi_tnycthrd.c:53
    SCIP_RETCODE SCIPtpiSubmitJob(SCIP_JOB *job, SCIP_SUBMITSTATUS *status)
    Definition: tpi_tnycthrd.c:639
    static SCIP_Bool isJobRunning(SCIP_JOBQUEUE *currentjobs, int jobid)
    Definition: tpi_tnycthrd.c:562
    void SCIPtpiDestroyLock(SCIP_LOCK **lock)
    Definition: tpi_tnycthrd.c:745
    void SCIPtpiGetLibraryDesc(char *desc, int descsize)
    Definition: tpi_tnycthrd.c:863
    SCIP_RETCODE SCIPtpiCollectJobs(int jobid)
    Definition: tpi_tnycthrd.c:656
    #define SCIPtnyWaitCondition(condition, lock)
    Definition: tpi_tnycthrd.c:54
    #define SCIPtnyReleaseLock(lock)
    Definition: tpi_tnycthrd.c:47
    #define SCIPtnyDestroyCondition(condition)
    Definition: tpi_tnycthrd.c:51
    #define SCIPtnySignalCondition(condition)
    Definition: tpi_tnycthrd.c:52
    int SCIPtpiGetThreadNum(void)
    Definition: tpi_tnycthrd.c:838
    int SCIPtpiGetNumThreads(void)
    Definition: tpi_tnycthrd.c:574
    void SCIPtpiDestroyCondition(SCIP_CONDITION **condition)
    Definition: tpi_tnycthrd.c:795
    int SCIPtpiGetNewJobID(void)
    Definition: tpi_tnycthrd.c:624
    #define SCIPtnyDestroyLock(lock)
    Definition: tpi_tnycthrd.c:45
    static int threadPoolThread(void *threadnum)
    Definition: tpi_tnycthrd.c:250
    void SCIPtpiGetLibraryName(char *name, int namesize)
    Definition: tpi_tnycthrd.c:852
    SCIP_RETCODE SCIPtpiInitLock(SCIP_LOCK **lock)
    Definition: tpi_tnycthrd.c:727
    static SCIP_RETCODE freeThreadPool(SCIP_THREADPOOL **thrdpool, SCIP_Bool finishjobs, SCIP_Bool completequeue)
    Definition: tpi_tnycthrd.c:449
    static void freeJobQueue(SCIP_THREADPOOL *thrdpool)
    Definition: tpi_tnycthrd.c:424
    SCIP_RETCODE SCIPtpiReleaseLock(SCIP_LOCK *lock)
    Definition: tpi_tnycthrd.c:766
    SCIP_RETCODE SCIPtpiInitCondition(SCIP_CONDITION **condition)
    Definition: tpi_tnycthrd.c:781
    #define SCIPtnyAcquireLock(lock)
    Definition: tpi_tnycthrd.c:46
    SCIP_RETCODE SCIPtpiInit(int nthreads, int queuesize, SCIP_Bool blockwhenfull)
    Definition: tpi_tnycthrd.c:582
    @ SCIP_OKAY
    Definition: type_retcode.h:42
    @ SCIP_ERROR
    Definition: type_retcode.h:43
    enum SCIP_Retcode SCIP_RETCODE
    Definition: type_retcode.h:63
    enum SCIP_Submitstatus SCIP_SUBMITSTATUS
    Definition: type_tpi.h:50
    @ SCIP_JOB_DOESNOTEXIST
    Definition: type_tpi.h:60
    @ SCIP_JOB_INQUEUE
    Definition: type_tpi.h:61
    enum SCIP_Jobstatus SCIP_JOBSTATUS
    Definition: type_tpi.h:65
    @ SCIP_SUBMIT_SUCCESS
    Definition: type_tpi.h:48
    @ SCIP_SUBMIT_SHUTDOWN
    Definition: type_tpi.h:47
    @ SCIP_SUBMIT_QUEUEFULL
    Definition: type_tpi.h:45
    @ SCIP_SUBMIT_QUEUECLOSED
    Definition: type_tpi.h:46