Scippy

SCIP

Solving Constraint Integer Programs

tpi_tnycthrd.c
Go to the documentation of this file.
1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2 /* */
3 /* This file is part of the program and library */
4 /* SCIP --- Solving Constraint Integer Programs */
5 /* */
6 /* Copyright (c) 2002-2024 Zuse Institute Berlin (ZIB) */
7 /* */
8 /* Licensed under the Apache License, Version 2.0 (the "License"); */
9 /* you may not use this file except in compliance with the License. */
10 /* You may obtain a copy of the License at */
11 /* */
12 /* http://www.apache.org/licenses/LICENSE-2.0 */
13 /* */
14 /* Unless required by applicable law or agreed to in writing, software */
15 /* distributed under the License is distributed on an "AS IS" BASIS, */
16 /* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */
17 /* See the License for the specific language governing permissions and */
18 /* limitations under the License. */
19 /* */
20 /* You should have received a copy of the Apache-2.0 license */
21 /* along with SCIP; see the file LICENSE. If not visit scipopt.org. */
22 /* */
23 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
24 
25 /**@file tpi_tnycthrd.c
26  * @ingroup TASKINTERFACE
27  * @brief a TPI implementation using tinycthreads
28  * @author Stephen J. Maher
29  * @author Leona Gottwald
30  * @author Marc Pfetsch
31  */
32 
33 /*---+----1----+----2----+----3----+----4----+----5----+----6----+----7----+----8----+----9----+----0----+----1----+----2*/
34 
35 #include "tpi/tpi.h"
36 #include "blockmemshell/memory.h"
37 #include "tinycthread/tinycthread.h"
38 #include "scip/pub_message.h"
39 
40 /* macros for direct access */
41 
42 /* lock */
43 #define SCIPtnyInitLock(lock) ( mtx_init((lock), mtx_plain) == thrd_success ? SCIP_OKAY : SCIP_ERROR )
44 #define SCIPtnyDestroyLock(lock) ( mtx_destroy(lock) )
45 #define SCIPtnyAcquireLock(lock) ( mtx_lock(lock) == thrd_success ? SCIP_OKAY : SCIP_ERROR )
46 #define SCIPtnyReleaseLock(lock) ( mtx_unlock(lock) == thrd_success ? SCIP_OKAY : SCIP_ERROR )
47 
48 /* condition */
49 #define SCIPtnyInitCondition(condition) ( cnd_init(condition) == thrd_success ? SCIP_OKAY : SCIP_ERROR )
50 #define SCIPtnyDestroyCondition(condition) ( cnd_destroy(condition) )
51 #define SCIPtnySignalCondition(condition) ( cnd_signal(condition) == thrd_success ? SCIP_OKAY : SCIP_ERROR )
52 #define SCIPtnyBroadcastCondition(condition) ( cnd_broadcast(condition) == thrd_success ? SCIP_OKAY : SCIP_ERROR )
53 #define SCIPtnyWaitCondition(condition, lock) ( cnd_wait((condition), (lock)) == thrd_success ? SCIP_OKAY: SCIP_ERROR )
54 
55 /** struct containing lock */
56 struct SCIP_Lock
57 {
58  mtx_t lock;
59 };
60 
61 /** struct containing condition */
62 struct SCIP_Condition
63 {
64  cnd_t condition;
65 };
66 
67 
69 static SCIP_THREADPOOL* _threadpool = NULL;
70 _Thread_local int _threadnumber; /*lint !e129*/
71 
72 /** A job added to the queue */
73 struct SCIP_Job
74 {
75  int jobid; /**< id to identify jobs from a common process */
76  struct SCIP_Job* nextjob; /**< pointer to the next job in the queue */
77  SCIP_RETCODE (*jobfunc)(void* args);/**< pointer to the job function */
78  void* args; /**< pointer to the function arguments */
79  SCIP_RETCODE retcode; /**< return code of the job */
80 };
81 
82 /** the thread pool job queue */
83 struct SCIP_JobQueue
84 {
85  SCIP_JOB* firstjob; /**< pointer to the first job in the queue */
86  SCIP_JOB* lastjob; /**< pointer to the last job in the queue */
87  int njobs; /**< number of jobs in the queue */
88 };
90 
91 /** The thread pool */
93 {
94  /* Pool Characteristics */
95  int nthreads; /**< number of threads in the pool */
96  int queuesize; /**< the total number of items to enter the queue */
97 
98  /* Current pool state */
99  thrd_t* threads; /**< the threads included in the pool */
100  SCIP_JOBQUEUE* jobqueue; /**< the job queue */
101  SCIP_JOBQUEUE* currentjobs; /**< the jobs currently being processed on a thread;
102  * only a single job is allowed per thread. */
103  SCIP_JOBQUEUE* finishedjobs; /**< finished jobs that are not yet collected */
104  int currworkingthreads; /**< the threads currently processing jobs */
105  SCIP_Bool blockwhenfull; /**< indicates that the queue can only be as large as nthreads */
106  int currentid; /**< current job id */
107 
108  /* Control indicators */
109  SCIP_Bool shutdown; /**< indicates whether the pool needs to be shut down */
110  SCIP_Bool queueopen; /**< indicates whether the queue is open */
111 
112  /* mutex and locks for the thread pool */
113  mtx_t poollock; /**< mutex to allow read and write of the pool features */
114  cnd_t queuenotempty; /**< condition to broadcast the queue has jobs */
115  cnd_t queuenotfull; /**< condition to broadcast the queue is not full */
116  cnd_t queueempty; /**< condition to broadcast that the queue is empty */
117  cnd_t jobfinished; /**< condition to broadcast that a job has been finished */
118 };
119 
120 /** this function controls the execution of each of the threads */
121 static
123  void* threadnum /**< thread number is passed in as argument stored inside a void pointer */
124  )
125 {
126  SCIP_JOB* newjob;
127  SCIP_JOB* prevjob;
128  SCIP_JOB* currjob;
129 
130  _threadnumber = (int)(uintptr_t) threadnum;
131 
132  /* Increase the number of active threads */
133  SCIP_CALL( SCIPtnyAcquireLock(&(_threadpool->poollock)) );
134  _threadpool->currworkingthreads += 1;
135  SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
136 
137  /* this is an endless loop that runs until the thrd_exit function is called */
138  while( TRUE ) /*lint !e716*/
139  {
140  SCIP_CALL( SCIPtnyAcquireLock(&(_threadpool->poollock)) );
141 
142  /* the queue is empty but the shutdown command has not been given */
143  while( _threadpool->jobqueue->njobs == 0 && !_threadpool->shutdown )
144  {
145  SCIP_CALL( SCIPtnyWaitCondition(&(_threadpool->queuenotempty), &(_threadpool->poollock)) );
146  }
147 
148  /* if the shutdown command has been given, then exit the thread */
149  if( _threadpool->shutdown )
150  {
151  /* Decrease the thread count when execution of job queue has completed */
152  _threadpool->currworkingthreads -= 1;
153  SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
154 
155  thrd_exit((int)SCIP_OKAY);
156  }
157 
158  /* getting the next job in the queue */
159  newjob = _threadpool->jobqueue->firstjob;
160  _threadpool->jobqueue->njobs--; /* decreasing the number of jobs in the queue */
161 
162  if( _threadpool->jobqueue->njobs == 0 )
163  {
164  _threadpool->jobqueue->firstjob = NULL;
165  _threadpool->jobqueue->lastjob = NULL;
166  }
167  else
168  _threadpool->jobqueue->firstjob = newjob->nextjob; /* updating the queue */
169 
170  /* if we want to wait when the queue is full, then we broadcast that the queue can now take new jobs */
171  if( _threadpool->blockwhenfull &&
172  _threadpool->jobqueue->njobs == _threadpool->queuesize - 1 )
173  {
175  }
176 
177  /* indicating that the queue is empty */
178  if( _threadpool->jobqueue->njobs == 0 )
179  {
180  SCIP_CALL( SCIPtnyBroadcastCondition(&(_threadpool->queueempty)) );
181  }
182 
183  /* updating the current job list */
184  if( _threadpool->currentjobs->njobs == 0 )
185  {
186  _threadpool->currentjobs->firstjob = newjob;
187  _threadpool->currentjobs->lastjob = newjob;
188  }
189  else
190  {
191  _threadpool->currentjobs->lastjob->nextjob = newjob;
192  _threadpool->currentjobs->lastjob = newjob;
193  }
194 
195  _threadpool->currentjobs->njobs++;
196 
197  SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
198 
199  /* setting the job to run on this thread */
200  newjob->retcode = (*(newjob->jobfunc))(newjob->args);
201 
202  /* setting the current job on this thread to NULL */
203  SCIP_CALL( SCIPtnyAcquireLock(&(_threadpool->poollock)) );
204 
205  /* finding the location of the processed job in the currentjobs queue */
206  currjob = _threadpool->currentjobs->firstjob;
207  prevjob = NULL;
208 
209  while( currjob != newjob )
210  {
211  prevjob = currjob;
212  currjob = prevjob->nextjob;
213  }
214 
215  /* removing the processed job from current jobs list */
216  if( currjob == _threadpool->currentjobs->firstjob )
217  _threadpool->currentjobs->firstjob = currjob->nextjob;
218  else
219  prevjob->nextjob = currjob->nextjob; /*lint !e794*/
220 
221  if( currjob == _threadpool->currentjobs->lastjob )
222  _threadpool->currentjobs->lastjob = prevjob;
223 
224  _threadpool->currentjobs->njobs--;
225 
226  /* updating the finished job list */
227  if( _threadpool->finishedjobs->njobs == 0 )
228  {
229  _threadpool->finishedjobs->firstjob = newjob;
230  _threadpool->finishedjobs->lastjob = newjob;
231  }
232  else
233  {
234  _threadpool->finishedjobs->lastjob->nextjob = newjob;
235  _threadpool->finishedjobs->lastjob = newjob;
236  }
237 
238  _threadpool->finishedjobs->njobs++;
239 
240  /* signalling that a job has been finished */
241  SCIP_CALL( SCIPtnyBroadcastCondition(&(_threadpool)->jobfinished) );
242 
243  SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
244  }
245 }
246 
247 /** this function controls the execution of each of the threads */
248 static
250  void* threadnum /**< thread number is passed in as argument stored inside a void pointer */
251  )
252 {
253  return (int) threadPoolThreadRetcode(threadnum);
254 }
255 
256 /** creates a threadpool */
257 static
259  SCIP_THREADPOOL** thrdpool, /**< pointer to store threadpool */
260  int nthreads, /**< number of threads in the threadpool */
261  int qsize, /**< maximum size of the jobqueue */
262  SCIP_Bool blockwhenfull /**< should the jobqueue block if it is full */
263  )
264 {
265  uintptr_t i;
266 
267  assert(nthreads >= 0);
268  assert(qsize >= 0);
269 
270  /* @todo think about the correct memory here */
271  SCIP_ALLOC( BMSallocMemory(thrdpool) );
272  (*thrdpool)->currentid = 0;
273  (*thrdpool)->queuesize = qsize;
274  (*thrdpool)->nthreads = nthreads;
275  (*thrdpool)->blockwhenfull = blockwhenfull;
276  (*thrdpool)->shutdown = FALSE;
277  (*thrdpool)->queueopen = TRUE;
278 
279  /* allocating memory for the job queue */
280  SCIP_ALLOC( BMSallocMemory(&(*thrdpool)->jobqueue) );
281  (*thrdpool)->jobqueue->firstjob = NULL;
282  (*thrdpool)->jobqueue->lastjob = NULL;
283  (*thrdpool)->jobqueue->njobs = 0;
284 
285  /* allocating memory for the job queue */
286  SCIP_ALLOC( BMSallocMemory(&(*thrdpool)->currentjobs) );
287  (*thrdpool)->currentjobs->firstjob = NULL;
288  (*thrdpool)->currentjobs->lastjob = NULL;
289  (*thrdpool)->currentjobs->njobs = 0;
290 
291  /* allocating memory for the job queue */
292  SCIP_ALLOC( BMSallocMemory(&(*thrdpool)->finishedjobs) );
293  (*thrdpool)->finishedjobs->firstjob = NULL;
294  (*thrdpool)->finishedjobs->lastjob = NULL;
295  (*thrdpool)->finishedjobs->njobs = 0;
296 
297  /* initialising the mutex */
298  SCIP_CALL( SCIPtnyInitLock(&(*thrdpool)->poollock) ); /*lint !e2482*/
299 
300  /* initialising the conditions */
301  SCIP_CALL( SCIPtnyInitCondition(&(*thrdpool)->queuenotempty) );
302  SCIP_CALL( SCIPtnyInitCondition(&(*thrdpool)->queuenotfull) );
303  SCIP_CALL( SCIPtnyInitCondition(&(*thrdpool)->queueempty) );
304  SCIP_CALL( SCIPtnyInitCondition(&(*thrdpool)->jobfinished) );
305 
306  /* creating the threads */
307  (*thrdpool)->currworkingthreads = 0;
308 
309  /* allocating memory for the threads */
310  SCIP_ALLOC( BMSallocMemoryArray(&((*thrdpool)->threads), nthreads) );
311 
312  /* create the threads */
313  for( i = 0; i < (unsigned)nthreads; i++ )
314  {
315  if( thrd_create(&((*thrdpool)->threads[i]), threadPoolThread, (void*)i) != thrd_success )
316  return SCIP_ERROR;
317  }
318 
319  _threadnumber = nthreads;
320  /* halt while all threads are not active TODO: is synchronization required here ? */
321  /*TODO: this caused a deadlock, is it important to wait for all threads to start?
322  * while( (*thrdpool)->currworkingthreads != nthreads )
323  {}*/
324 
325  return SCIP_OKAY;
326 }
327 
328 /** adding a job to the job queue.
329  *
330  * This gives some more flexibility in the handling of new jobs.
331  * This function needs to be called from within a mutex.
332  */
333 static
335  SCIP_THREADPOOL* threadpool, /**< pointer to store threadpool */
336  SCIP_JOB* newjob /**< pointer to new job */
337  )
338 {
339  /* @todo we want to work out what to do with a full job queue. Is there a problem if the limit is hit? */
340  /* @note it is important to have a queuesize. This will stop the code submitting infinitely many jobs. */
341  assert(threadpool->jobqueue->njobs < threadpool->queuesize);
342 
343  newjob->nextjob = NULL;
344 
345  /* checking the status of the job queue */
346  if( threadpool->jobqueue->njobs == 0 )
347  {
348  threadpool->jobqueue->firstjob = newjob;
349  threadpool->jobqueue->lastjob = newjob;
350  }
351  else /* it is assumed that the jobqueue is not full */
352  {
353  threadpool->jobqueue->lastjob->nextjob = newjob;
354  threadpool->jobqueue->lastjob = newjob;
355  }
356 
357  /* signalling to all threads that the queue has jobs using the signal instead of broadcast because only one thread
358  * should be awakened */
360 
361  threadpool->jobqueue->njobs++;
362 }
363 
364 /** adds a job to the threadpool */
365 static
367  SCIP_JOB* newjob, /**< job to add to threadpool */
368  SCIP_SUBMITSTATUS* status /**< pointer to store the job's submit status */
369  )
370 {
371  assert(newjob != NULL);
372  assert(_threadpool != NULL);
373 
374  SCIP_CALL( SCIPtnyAcquireLock(&(_threadpool->poollock)) );
375 
376  /* if the queue is full and we are blocking, then return an error. */
377  if( _threadpool->jobqueue->njobs == _threadpool->queuesize && _threadpool->blockwhenfull )
378  {
379  SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
380  *status = SCIP_SUBMIT_QUEUEFULL;
381  return SCIP_OKAY;
382  }
383 
384  /* Wait until the job queue is not full. If the queue is closed or the thread pool is shut down, then stop waiting. */
385  /* @todo this needs to be checked. It is possible that a job can be submitted and then the queue is closed or the
386  * thread pool is shut down. Need to work out the best way to handle this. */
387  while( _threadpool->jobqueue->njobs == _threadpool->queuesize && !(_threadpool->shutdown || !_threadpool->queueopen) )
388  {
389  SCIP_CALL( SCIPtnyWaitCondition(&(_threadpool->queuenotfull), &(_threadpool->poollock)) );
390  }
391 
392  /* if the thread pool is shut down or the queue is closed, then we need to leave the job submission */
393  if( !_threadpool->queueopen )
394  {
395  SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
396  *status = SCIP_SUBMIT_QUEUECLOSED;
397  return SCIP_OKAY;
398  }
399  else if( _threadpool->shutdown )
400  {
401  SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
402  *status = SCIP_SUBMIT_SHUTDOWN;
403  return SCIP_OKAY;
404  }
405 
406  /* creating the job for submission */
407  newjob->nextjob = NULL;
408 
409  /* adding the job to the queue */
410  /* this can only happen if the queue is not full */
411  assert(_threadpool->jobqueue->njobs != _threadpool->queuesize);
412  jobQueueAddJob(_threadpool, newjob);
413 
414  SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
415 
416  *status = SCIP_SUBMIT_SUCCESS;
417 
418  return SCIP_OKAY;
419 }
420 
421 /** frees the jobqueue of the threadpool */
422 static
424  SCIP_THREADPOOL* thrdpool /**< pointer to thread pool */
425  )
426 {
427  SCIP_JOB* currjob;
428 
429  assert(!thrdpool->queueopen);
430  assert(thrdpool->shutdown);
431 
432  /* iterating through all jobs until all have been freed */
433  while( thrdpool->jobqueue->firstjob != NULL )
434  {
435  currjob = thrdpool->jobqueue->firstjob->nextjob;
436  thrdpool->jobqueue->firstjob = thrdpool->jobqueue->firstjob->nextjob;
437  BMSfreeMemory(&currjob);
438  }
439 
440  assert(thrdpool->jobqueue->firstjob == NULL);
441  assert(thrdpool->jobqueue->lastjob == NULL);
442 
443  BMSfreeMemory(&thrdpool->jobqueue);
444 }
445 
446 /** free the thread pool */
447 static
449  SCIP_THREADPOOL** thrdpool, /**< pointer to thread pool */
450  SCIP_Bool finishjobs, /**< currently unused */
451  SCIP_Bool completequeue /**< Wait until the queue has complete? */
452  )
453 {
454  int i;
455  SCIP_RETCODE retcode;
456 
457  /*TODO remove argument? */
458  SCIP_UNUSED( finishjobs );
459 
460  SCIP_CALL( SCIPtnyAcquireLock(&((*thrdpool)->poollock)) );
461 
462  /* if the shutdown is already in progress, then we don't need to complete this function */
463  if( !(*thrdpool)->queueopen || (*thrdpool)->shutdown )
464  {
465  SCIP_CALL( SCIPtnyReleaseLock(&((*thrdpool)->poollock)) );
466 
467  return SCIP_OKAY;
468  }
469 
470  /* indicating that the job queue is now closed for new jobs */
471  (*thrdpool)->queueopen = FALSE;
472 
473  /* if the jobs in the queue should be completed, then we wait until the queueempty condition is set */
474  if( completequeue )
475  {
476  while( (*thrdpool)->jobqueue->njobs > 0 )
477  {
478  SCIP_CALL( SCIPtnyWaitCondition(&((*thrdpool)->queueempty), &((*thrdpool)->poollock)) );
479  }
480  }
481 
482  /* indicating that the tpi has commenced the shutdown process */
483  (*thrdpool)->shutdown = TRUE;
484 
485  SCIP_CALL( SCIPtnyReleaseLock(&((*thrdpool)->poollock)) );
486 
487  /* waking up all threads so that they can check the shutdown condition;
488  * this requires that the conditions queuenotempty and queuenotfull is broadcast
489  */
490  SCIP_CALL( SCIPtnyBroadcastCondition(&((*thrdpool)->queuenotempty)) );
491  SCIP_CALL( SCIPtnyBroadcastCondition(&((*thrdpool)->queuenotfull)) );
492 
493  retcode = SCIP_OKAY;
494 
495  /* calling a join to ensure that all worker finish before the thread pool is closed */
496  for( i = 0; i < (*thrdpool)->nthreads; i++ )
497  {
498  int thrdretcode;
499 
500  if( thrd_join((*thrdpool)->threads[i], &thrdretcode) != thrd_success )
501  retcode = (SCIP_RETCODE) MIN((int)SCIP_ERROR, (int)retcode);
502  else
503  retcode = (SCIP_RETCODE) MIN(thrdretcode, (int)retcode);
504  }
505 
506  /* freeing memory and data structures */
507  BMSfreeMemoryArray(&(*thrdpool)->threads);
508 
509  /* Freeing the current jobs list. This assumes that all jobs complete before the tpi is closed. */
510  assert((*thrdpool)->currentjobs->njobs == 0);
511  BMSfreeMemory(&(*thrdpool)->currentjobs);
512  assert((*thrdpool)->finishedjobs->njobs == 0);
513  BMSfreeMemory(&(*thrdpool)->finishedjobs);
514 
515  freeJobQueue(*thrdpool);
516 
517  /* destroying the conditions */
518  SCIPtnyDestroyCondition(&(*thrdpool)->jobfinished);
519  SCIPtnyDestroyCondition(&(*thrdpool)->queueempty);
520  SCIPtnyDestroyCondition(&(*thrdpool)->queuenotfull);
521  SCIPtnyDestroyCondition(&(*thrdpool)->queuenotempty);
522 
523  /* destroying the mutex */
524  SCIPtnyDestroyLock(&(*thrdpool)->poollock);
525 
526  BMSfreeMemory(thrdpool);
527 
528  return retcode;
529 }
530 
531 
532 /* checking a job queue */
533 static
535  SCIP_JOBQUEUE* jobqueue, /**< pointer to the job queue */
536  int jobid /**< id of job to check */
537  )
538 {
539  SCIP_JOB* currjob = jobqueue->firstjob;
540 
541  /* checking the job ids */
542  if( currjob != NULL )
543  {
544  while( currjob != jobqueue->lastjob )
545  {
546  if( currjob->jobid == jobid )
547  return SCIP_JOB_INQUEUE;
548 
549  currjob = currjob->nextjob;
550  }
551 
552  if( currjob->jobid == jobid )
553  return SCIP_JOB_INQUEUE;
554  }
555 
556  return SCIP_JOB_DOESNOTEXIST;
557 }
558 
559 /** returns whether the job id is running */
560 static
562  SCIP_JOBQUEUE* currentjobs, /**< queue of current jobs */
563  int jobid /**< id of job to check */
564  )
565 {
566  if( checkJobQueue(currentjobs, jobid) == SCIP_JOB_INQUEUE )
567  return TRUE;
568  else
569  return FALSE;
570 }
571 
572 /** returns the number of threads */
574  void
575  )
576 {
577  return _threadpool->nthreads;
578 }
579 
580 /** initializes tpi */
582  int nthreads, /**< the number of threads to be used */
583  int queuesize, /**< the size of the queue */
584  SCIP_Bool blockwhenfull /**< should the queue block when full */
585  )
586 {
587  assert(_threadpool == NULL);
588  SCIP_CALL( createThreadPool(&_threadpool, nthreads, queuesize, blockwhenfull) );
589  return SCIP_OKAY;
590 }
591 
592 /** deinitializes tpi */
594  void
595  )
596 {
597  assert(_threadpool != NULL);
598 
599  SCIP_CALL( freeThreadPool(&_threadpool, TRUE, TRUE) );
600 
601  return SCIP_OKAY;
602 }
603 
604 /** creates a job for parallel processing */
606  SCIP_JOB** job, /**< pointer to the job that will be created */
607  int jobid, /**< the id for the current job */
608  SCIP_RETCODE (*jobfunc)(void* args),/**< pointer to the job function */
609  void* jobarg /**< the job's argument */
610  )
611 {
612  SCIP_ALLOC( BMSallocMemory(job) );
613 
614  (*job)->jobid = jobid;
615  (*job)->jobfunc = jobfunc;
616  (*job)->args = jobarg;
617  (*job)->nextjob = NULL;
618 
619  return SCIP_OKAY;
620 }
621 
622 /** get a new job id for the new set of submitted jobs */
624  void
625  )
626 {
627  int id;
628  assert(_threadpool != NULL);
629 
630  SCIP_CALL_ABORT( SCIPtnyAcquireLock(&_threadpool->poollock) );
631  id = ++_threadpool->currentid;
632  SCIP_CALL_ABORT( SCIPtnyReleaseLock(&_threadpool->poollock) );
633 
634  return id;
635 }
636 
637 /** submit a job for parallel processing; the return value is a globally defined status */
639  SCIP_JOB* job, /**< pointer to the job to be submitted */
640  SCIP_SUBMITSTATUS* status /**< pointer to store the job's submit status */
641  )
642 {
643  assert(job != NULL);
644 
645  /* the job id must be set before submitting the job. The submitter controls whether a new id is required. */
646  assert(job->jobid == _threadpool->currentid);
647  SCIP_CALL( threadPoolAddWork(job, status) );
648 
649  return SCIP_OKAY;
650 }
651 
652 /** blocks until all jobs of the given jobid have finished
653  * and then returns the smallest SCIP_RETCODE of all the jobs
654  */
656  int jobid /**< the jobid of the jobs to wait for */
657  )
658 {
659  SCIP_RETCODE retcode;
660  SCIP_JOB* currjob;
661  SCIP_JOB* prevjob;
662 
663  SCIP_CALL( SCIPtnyAcquireLock(&(_threadpool->poollock)) );
664 
665  while( isJobRunning(_threadpool->currentjobs, jobid) || isJobRunning(_threadpool->jobqueue, jobid) )
666  {
667  SCIP_CALL( SCIPtnyWaitCondition(&_threadpool->jobfinished, &_threadpool->poollock) );
668  }
669 
670  /* finding the location of the processed job in the currentjobs queue */
671  retcode = SCIP_OKAY;
672  currjob = _threadpool->finishedjobs->firstjob;
673  prevjob = NULL;
674 
675  while( currjob )
676  {
677  if( currjob->jobid == jobid )
678  {
679  SCIP_JOB* nextjob;
680 
681  /* if the job has the right jobid collect its retcode,
682  * remove it from the finished job list, and free it
683  */
684  retcode = MIN(retcode, currjob->retcode);
685 
686  /* removing the finished job from finished jobs list */
687  if( currjob == _threadpool->finishedjobs->firstjob )
688  {
689  _threadpool->finishedjobs->firstjob = currjob->nextjob;
690  }
691  else
692  {
693  assert(prevjob != NULL);
694  prevjob->nextjob = currjob->nextjob;
695  }
696 
697  if( currjob == _threadpool->finishedjobs->lastjob )
698  _threadpool->finishedjobs->lastjob = prevjob;
699 
700  _threadpool->finishedjobs->njobs--;
701 
702  /* update currjob and free finished job; prevjob stays the same */
703  nextjob = currjob->nextjob;
704  BMSfreeMemory(&currjob);
705  currjob = nextjob;
706  }
707  else
708  {
709  /* otherwise leave job untouched */
710  prevjob = currjob;
711  currjob = prevjob->nextjob;
712  }
713  }
714 
715  SCIP_CALL( SCIPtnyReleaseLock(&_threadpool->poollock) );
716 
717  return retcode;
718 }
719 
720 
721 /*
722  * locks
723  */
724 
725 /** initializes the given lock */
727  SCIP_LOCK** lock /**< the lock */
728  )
729 {
730  assert(lock != NULL);
731 
732  SCIP_ALLOC( BMSallocMemory(lock) );
733 
734  if( mtx_init(&(*lock)->lock, mtx_plain) == thrd_success )
735  return SCIP_OKAY;
736  else
737  {
738  BMSfreeMemory(lock);
739  return SCIP_ERROR;
740  }
741 }
742 
743 /** destroys the given lock */
745  SCIP_LOCK** lock /**< the lock */
746  )
747 {
748  assert(lock != NULL);
749 
750  mtx_destroy(&(*lock)->lock);
751  BMSfreeMemory(lock);
752 }
753 
754 /** acquires the given lock */
756  SCIP_LOCK* lock /**< the lock */
757  )
758 {
759  if( mtx_lock(&lock->lock) == thrd_success )
760  return SCIP_OKAY;
761  return SCIP_ERROR;
762 }
763 
764 /** releases the given lock */
766  SCIP_LOCK* lock /**< the lock */
767  )
768 {
769  if( mtx_unlock(&lock->lock) == thrd_success )
770  return SCIP_OKAY;
771  return SCIP_ERROR;
772 }
773 
774 
775 /*
776  * conditions
777  */
778 
779 /** initializes the given condition variable */
781  SCIP_CONDITION** condition /**< condition to be created and initialized */
782  )
783 {
784  assert(condition != NULL);
785 
786  SCIP_ALLOC( BMSallocMemory(condition) );
787 
788  if( cnd_init(&(*condition)->condition) == thrd_success )
789  return SCIP_OKAY;
790  return SCIP_ERROR;
791 }
792 
793 /** destroys the given condition variable */
795  SCIP_CONDITION** condition /**< condition to be destroyed and freed */
796  )
797 {
798  cnd_destroy(&(*condition)->condition);
799  BMSfreeMemory(condition);
800 }
801 
802 /** signals one waiting thread */
804  SCIP_CONDITION* condition /**< the condition variable to signal */
805  )
806 {
807  if( cnd_signal(&condition->condition) == thrd_success )
808  return SCIP_OKAY;
809  return SCIP_ERROR;
810 }
811 
812 /** signals all waiting threads */
813 SCIP_EXPORT
815  SCIP_CONDITION* condition /**< the condition variable to broadcast */
816  )
817 {
818  if( cnd_broadcast(&condition->condition) == thrd_success )
819  return SCIP_OKAY;
820  return SCIP_ERROR;
821 }
822 
823 /** waits on a condition variable. The given lock must be held by the caller and will
824  * be held when this function returns.
825  */
827  SCIP_CONDITION* condition, /**< the condition variable to wait on */
828  SCIP_LOCK* lock /**< the lock that is held by the caller */
829  )
830 {
831  if( cnd_wait(&condition->condition, &lock->lock) == thrd_success )
832  return SCIP_OKAY;
833  return SCIP_ERROR;
834 }
835 
836 /** returns the thread number */
838  void
839  )
840 {
841  return _threadnumber;
842 }
#define NULL
Definition: def.h:267
static int threadPoolThread(void *threadnum)
Definition: tpi_tnycthrd.c:249
enum SCIP_Submitstatus SCIP_SUBMITSTATUS
Definition: type_tpi.h:50
SCIP_RETCODE SCIPtpiAcquireLock(SCIP_LOCK *lock)
Definition: tpi_tnycthrd.c:755
#define SCIPtnyInitCondition(condition)
Definition: tpi_tnycthrd.c:49
SCIP_JOB * firstjob
Definition: tpi_openmp.c:83
#define FALSE
Definition: def.h:94
enum SCIP_Jobstatus SCIP_JOBSTATUS
Definition: type_tpi.h:65
void * args
Definition: tpi_openmp.c:76
static SCIP_JOBSTATUS checkJobQueue(SCIP_JOBQUEUE *jobqueue, int jobid)
Definition: tpi_tnycthrd.c:534
#define TRUE
Definition: def.h:93
enum SCIP_Retcode SCIP_RETCODE
Definition: type_retcode.h:63
#define SCIP_UNUSED(x)
Definition: def.h:434
#define BMSallocMemoryArray(ptr, num)
Definition: memory.h:123
SCIP_JOB * lastjob
Definition: tpi_openmp.c:84
SCIP_JOBQUEUE * finishedjobs
Definition: tpi_tnycthrd.c:103
static SCIP_RETCODE createThreadPool(SCIP_THREADPOOL **thrdpool, int nthreads, int qsize, SCIP_Bool blockwhenfull)
Definition: tpi_tnycthrd.c:258
#define BMSfreeMemory(ptr)
Definition: memory.h:145
SCIP_JOBQUEUE * jobqueue
Definition: tpi_tnycthrd.c:100
thrd_t * threads
Definition: tpi_tnycthrd.c:99
SCIP_RETCODE SCIPtpiCreateJob(SCIP_JOB **job, int jobid, SCIP_RETCODE(*jobfunc)(void *args), void *jobarg)
Definition: tpi_tnycthrd.c:605
SCIP_RETCODE SCIPtpiInit(int nthreads, int queuesize, SCIP_Bool blockwhenfull)
Definition: tpi_tnycthrd.c:581
mtx_t lock
Definition: tpi_tnycthrd.c:58
SCIP_JOBQUEUE * currentjobs
Definition: tpi_tnycthrd.c:101
#define BMSfreeMemoryArray(ptr)
Definition: memory.h:147
SCIP_RETCODE SCIPtpiInitLock(SCIP_LOCK **lock)
Definition: tpi_tnycthrd.c:726
the type definitions for the SCIP parallel interface
SCIP_Bool blockwhenfull
Definition: tpi_tnycthrd.c:105
SCIP_RETCODE retcode
Definition: tpi_openmp.c:77
SCIP_RETCODE SCIPtpiSubmitJob(SCIP_JOB *job, SCIP_SUBMITSTATUS *status)
Definition: tpi_tnycthrd.c:638
SCIP_Bool queueopen
Definition: tpi_tnycthrd.c:110
static SCIP_RETCODE threadPoolThreadRetcode(void *threadnum)
Definition: tpi_tnycthrd.c:122
struct SCIP_Job * nextjob
Definition: tpi_openmp.c:74
#define SCIPtnyInitLock(lock)
Definition: tpi_tnycthrd.c:43
#define SCIPtnySignalCondition(condition)
Definition: tpi_tnycthrd.c:51
SCIP_RETCODE SCIPtpiExit(void)
Definition: tpi_tnycthrd.c:593
#define SCIP_CALL(x)
Definition: def.h:380
int jobid
Definition: tpi_openmp.c:73
int SCIPtpiGetNumThreads(void)
Definition: tpi_tnycthrd.c:573
SCIP_RETCODE SCIPtpiSignalCondition(SCIP_CONDITION *condition)
Definition: tpi_tnycthrd.c:803
#define SCIP_Bool
Definition: def.h:91
#define MIN(x, y)
Definition: def.h:243
SCIP_Bool shutdown
Definition: tpi_tnycthrd.c:109
void SCIPtpiDestroyCondition(SCIP_CONDITION **condition)
Definition: tpi_tnycthrd.c:794
#define SCIPtnyAcquireLock(lock)
Definition: tpi_tnycthrd.c:45
static SCIP_RETCODE threadPoolAddWork(SCIP_JOB *newjob, SCIP_SUBMITSTATUS *status)
Definition: tpi_tnycthrd.c:366
int SCIPtpiGetThreadNum(void)
Definition: tpi_tnycthrd.c:837
SCIP_RETCODE SCIPtpiReleaseLock(SCIP_LOCK *lock)
Definition: tpi_tnycthrd.c:765
#define SCIPtnyBroadcastCondition(condition)
Definition: tpi_tnycthrd.c:52
void SCIPtpiDestroyLock(SCIP_LOCK **lock)
Definition: tpi_tnycthrd.c:744
public methods for message output
static SCIP_RETCODE freeThreadPool(SCIP_THREADPOOL **thrdpool, SCIP_Bool finishjobs, SCIP_Bool completequeue)
Definition: tpi_tnycthrd.c:448
#define SCIPtnyWaitCondition(condition, lock)
Definition: tpi_tnycthrd.c:53
#define SCIPtnyDestroyLock(lock)
Definition: tpi_tnycthrd.c:44
static void freeJobQueue(SCIP_THREADPOOL *thrdpool)
Definition: tpi_tnycthrd.c:423
#define BMSallocMemory(ptr)
Definition: memory.h:118
SCIP_RETCODE(* jobfunc)(void *args)
Definition: tpi_openmp.c:75
SCIP_RETCODE SCIPtpiBroadcastCondition(SCIP_CONDITION *condition)
Definition: tpi_tnycthrd.c:814
#define SCIPtnyReleaseLock(lock)
Definition: tpi_tnycthrd.c:46
#define SCIPtnyDestroyCondition(condition)
Definition: tpi_tnycthrd.c:50
SCIP_RETCODE SCIPtpiCollectJobs(int jobid)
Definition: tpi_tnycthrd.c:655
SCIP_RETCODE SCIPtpiWaitCondition(SCIP_CONDITION *condition, SCIP_LOCK *lock)
Definition: tpi_tnycthrd.c:826
#define SCIP_CALL_ABORT(x)
Definition: def.h:359
#define SCIP_ALLOC(x)
Definition: def.h:391
static SCIP_Bool isJobRunning(SCIP_JOBQUEUE *currentjobs, int jobid)
Definition: tpi_tnycthrd.c:561
SCIP_RETCODE SCIPtpiInitCondition(SCIP_CONDITION **condition)
Definition: tpi_tnycthrd.c:780
int SCIPtpiGetNewJobID(void)
Definition: tpi_tnycthrd.c:623
static void jobQueueAddJob(SCIP_THREADPOOL *threadpool, SCIP_JOB *newjob)
Definition: tpi_tnycthrd.c:334
omp_lock_t lock
Definition: tpi_openmp.c:57
memory allocation routines