parallel execution is now explicit using one of the new built-in

predicates: parallel/1, parallel_findall/3 or parallel_once/1.
This commit is contained in:
Ricardo Rocha 2011-06-21 15:19:07 +01:00
parent 77171d4179
commit 8116aac432
14 changed files with 206 additions and 451 deletions

View File

@ -4201,13 +4201,6 @@ p_exitundefp( USES_REGS1 )
return FALSE;
}
#ifndef YAPOR
static Int
p_default_sequential( USES_REGS1 ) {
return(TRUE);
}
#endif
#ifdef DEBUG
extern void DumpActiveGoals(void);
@ -4366,11 +4359,14 @@ typedef void (*Proc)(void);
Proc E_Modules[]= {/* init_fc,*/ (Proc) 0 };
#ifndef YAPOR
static
Int p_yapor_threads( USES_REGS1 ) {
static Int p_parallel_mode( USES_REGS1 ) {
return FALSE;
}
#endif
static Int p_yapor_workers( USES_REGS1 ) {
return FALSE;
}
#endif /* YAPOR */
void
@ -4454,9 +4450,9 @@ Yap_InitCPreds(void)
Yap_InitCPred("$has_yap_or", 0, p_has_yap_or, SafePredFlag|SyncPredFlag|HiddenPredFlag);
Yap_InitCPred("$has_eam", 0, p_has_eam, SafePredFlag|SyncPredFlag|HiddenPredFlag);
#ifndef YAPOR
Yap_InitCPred("$c_default_sequential", 1, p_default_sequential, SafePredFlag|SyncPredFlag|HiddenPredFlag);
Yap_InitCPred("$c_yapor_threads", 1, p_yapor_threads, SafePredFlag|SyncPredFlag|HiddenPredFlag);
#endif
Yap_InitCPred("parallel_mode", 1, p_parallel_mode, SafePredFlag|SyncPredFlag);
Yap_InitCPred("$c_yapor_workers", 1, p_yapor_workers, SafePredFlag|SyncPredFlag|HiddenPredFlag);
#endif /* YAPOR */
#ifdef INES
Yap_InitCPred("euc_dist", 3, p_euc_dist, SafePredFlag);
Yap_InitCPred("loop", 0, p_loop, SafePredFlag);

View File

@ -102,11 +102,6 @@ void Yap_init_global_optyap_data(int max_table_size, int n_workers, int sch_loop
GLOBAL_scheduler_loop = sch_loop;
GLOBAL_delayed_release_load = delay_load;
/* global data related to or-performance */
GLOBAL_number_goals = 0;
GLOBAL_best_times(0) = 0;
GLOBAL_performance_mode = PERFORMANCE_OFF;
/* global data related to or-parallelism */
ALLOC_OR_FRAME(GLOBAL_root_or_fr);
BITMAP_clear(GLOBAL_bm_present_workers);
@ -130,10 +125,10 @@ void Yap_init_global_optyap_data(int max_table_size, int n_workers, int sch_loop
GLOBAL_locks_who_locked_heap = MAX_WORKERS;
INIT_LOCK(GLOBAL_locks_heap_access);
INIT_LOCK(GLOBAL_locks_alloc_block);
if (GLOBAL_number_workers== 1)
GLOBAL_parallel_execution_mode = FALSE;
if (GLOBAL_number_workers == 1)
GLOBAL_parallel_mode = PARALLEL_MODE_OFF;
else
GLOBAL_parallel_execution_mode = TRUE;
GLOBAL_parallel_mode = PARALLEL_MODE_ON;
#endif /* YAPOR */
#ifdef TABLING

View File

@ -52,16 +52,13 @@ static Int p_show_statistics_table( USES_REGS1 );
static Int p_show_statistics_tabling( USES_REGS1 );
static Int p_show_statistics_global_trie( USES_REGS1 );
#endif /* TABLING */
static Int p_yapor_threads( USES_REGS1 );
#ifdef YAPOR
static Int p_parallel_mode( USES_REGS1 );
static Int p_yapor_start( USES_REGS1 );
static Int p_yapor_workers( USES_REGS1 );
static Int p_worker( USES_REGS1 );
static Int p_yapor_on( USES_REGS1 );
static Int p_start_yapor( USES_REGS1 );
static Int p_default_sequential( USES_REGS1 );
static Int p_execution_mode( USES_REGS1 );
static Int p_performance( USES_REGS1 );
static Int p_parallel_new_answer( USES_REGS1 );
static Int p_parallel_yes_answer( USES_REGS1 );
static Int p_show_statistics_or( USES_REGS1 );
#endif /* YAPOR */
#if defined(YAPOR) && defined(TABLING)
@ -108,8 +105,6 @@ static inline long show_statistics_table_subgoal_answer_frames(IOSTREAM *out);
#ifdef YAPOR
#define TIME_RESOLUTION 1000000
#define NO_ANSWER 0
#define YES_ANSWER -1
static int length_answer;
static qg_ans_fr_ptr actual_answer;
#endif /* YAPOR */
@ -140,16 +135,12 @@ void Yap_init_optyap_preds(void) {
Yap_InitCPred("tabling_statistics", 1, p_show_statistics_tabling, SafePredFlag|SyncPredFlag);
Yap_InitCPred("global_trie_statistics", 1, p_show_statistics_global_trie, SafePredFlag|SyncPredFlag);
#endif /* TABLING */
Yap_InitCPred("$c_yapor_threads", 1, p_yapor_threads, SafePredFlag|SyncPredFlag|HiddenPredFlag);
#ifdef YAPOR
Yap_InitCPred("parallel_mode", 1, p_parallel_mode, SafePredFlag|SyncPredFlag);
Yap_InitCPred("$c_yapor_start", 0, p_yapor_start, SafePredFlag|SyncPredFlag|HiddenPredFlag);
Yap_InitCPred("$c_yapor_workers", 1, p_yapor_workers, SafePredFlag|SyncPredFlag|HiddenPredFlag);
Yap_InitCPred("$c_worker", 0, p_worker, SafePredFlag|SyncPredFlag|HiddenPredFlag);
Yap_InitCPred("$c_yapor_on", 0, p_yapor_on, SafePredFlag|SyncPredFlag|HiddenPredFlag);
Yap_InitCPred("$c_start_yapor", 0, p_start_yapor, SafePredFlag|SyncPredFlag|HiddenPredFlag);
Yap_InitCPred("$c_default_sequential", 1, p_default_sequential, SafePredFlag|SyncPredFlag|HiddenPredFlag);
Yap_InitCPred("execution_mode", 1, p_execution_mode, SafePredFlag|SyncPredFlag);
Yap_InitCPred("performance", 1, p_performance, SafePredFlag|SyncPredFlag);
Yap_InitCPred("$c_parallel_new_answer", 1, p_parallel_new_answer, SafePredFlag|SyncPredFlag|HiddenPredFlag);
Yap_InitCPred("$c_parallel_yes_answer", 0, p_parallel_yes_answer, SafePredFlag|SyncPredFlag|HiddenPredFlag);
Yap_InitCPred("or_statistics", 1, p_show_statistics_or, SafePredFlag|SyncPredFlag);
#endif /* YAPOR */
#if defined(YAPOR) && defined(TABLING)
@ -162,7 +153,8 @@ void Yap_init_optyap_preds(void) {
#ifdef YAPOR
void finish_yapor(void) {
GLOBAL_execution_time = current_time() - GLOBAL_execution_time;
show_answers();
GLOBAL_parallel_mode = PARALLEL_MODE_ON;
/* show_answers(); */
return;
}
#endif /* YAPOR */
@ -544,7 +536,58 @@ static Int p_show_statistics_global_trie( USES_REGS1 ) {
** YapOr C Predicates **
*********************************/
static Int p_yapor_threads( USES_REGS1 ) {
#ifdef YAPOR
static Int p_parallel_mode( USES_REGS1 ) {
Term t;
t = Deref(ARG1);
if (IsVarTerm(t)) {
Term ta;
if (GLOBAL_parallel_mode == PARALLEL_MODE_OFF)
ta = MkAtomTerm(Yap_LookupAtom("off"));
else if (GLOBAL_parallel_mode == PARALLEL_MODE_ON)
ta = MkAtomTerm(Yap_LookupAtom("on"));
else /* PARALLEL_MODE_RUNNING */
ta = MkAtomTerm(Yap_LookupAtom("running"));
Bind((CELL *)t, ta);
return(TRUE);
}
if (IsAtomTerm(t) && GLOBAL_parallel_mode != PARALLEL_MODE_RUNNING) {
char *s;
s = RepAtom(AtomOfTerm(t))->StrOfAE;
if (strcmp(s,"on") == 0) {
GLOBAL_parallel_mode = PARALLEL_MODE_ON;
return(TRUE);
}
if (strcmp(s,"off") == 0) {
GLOBAL_parallel_mode = PARALLEL_MODE_OFF;
return(TRUE);
}
return(FALSE); /* PARALLEL_MODE_RUNNING */
}
return(FALSE);
}
static Int p_yapor_start( USES_REGS1 ) {
#ifdef TIMESTAMP_CHECK
GLOBAL_timestamp = 0;
#endif /* TIMESTAMP_CHECK */
BITMAP_delete(GLOBAL_bm_idle_workers, 0);
BITMAP_clear(GLOBAL_bm_invisible_workers);
BITMAP_clear(GLOBAL_bm_requestable_workers);
#ifdef TABLING_INNER_CUTS
BITMAP_clear(GLOBAL_bm_pruning_workers);
#endif /* TABLING_INNER_CUTS */
make_root_choice_point();
GLOBAL_parallel_mode = PARALLEL_MODE_RUNNING;
GLOBAL_execution_time = current_time();
BITMAP_clear(GLOBAL_bm_finished_workers);
PUT_IN_EXECUTING(worker_id);
return (TRUE);
}
static Int p_yapor_workers( USES_REGS1 ) {
#ifdef YAPOR_THREADS
return Yap_unify(MkIntegerTerm(GLOBAL_number_workers),ARG1);
#else
@ -553,7 +596,6 @@ static Int p_yapor_threads( USES_REGS1 ) {
}
#ifdef YAPOR
static Int p_worker( USES_REGS1 ) {
CurrentModule = USER_MODULE;
P = GETWORK_FIRST_TIME;
@ -561,162 +603,6 @@ static Int p_worker( USES_REGS1 ) {
}
static Int p_yapor_on( USES_REGS1 ) {
return (GLOBAL_parallel_execution_mode);
}
static Int p_start_yapor( USES_REGS1 ) {
#ifdef TIMESTAMP_CHECK
GLOBAL_timestamp = 0;
#endif /* TIMESTAMP_CHECK */
GLOBAL_answers = NO_ANSWER;
BITMAP_delete(GLOBAL_bm_idle_workers, 0);
BITMAP_clear(GLOBAL_bm_invisible_workers);
BITMAP_clear(GLOBAL_bm_requestable_workers);
#ifdef TABLING_INNER_CUTS
BITMAP_clear(GLOBAL_bm_pruning_workers);
#endif /* TABLING_INNER_CUTS */
make_root_choice_point();
GLOBAL_performance_mode &= ~PERFORMANCE_IN_EXECUTION;
GLOBAL_execution_time = current_time();
BITMAP_clear(GLOBAL_bm_finished_workers);
PUT_IN_EXECUTING(worker_id);
return (TRUE);
}
static Int p_default_sequential( USES_REGS1 ) {
Term t;
t = Deref(ARG1);
if (IsVarTerm(t)) {
Term ta;
if (SEQUENTIAL_IS_DEFAULT)
ta = MkAtomTerm(Yap_LookupAtom("on"));
else
ta = MkAtomTerm(Yap_LookupAtom("off"));
Bind((CELL *)t, ta);
return(TRUE);
}
if (IsAtomTerm(t)) {
char *s;
s = RepAtom(AtomOfTerm(t))->StrOfAE;
if (strcmp(s, "on") == 0) {
SEQUENTIAL_IS_DEFAULT = TRUE;
return(TRUE);
}
if (strcmp(s,"off") == 0) {
SEQUENTIAL_IS_DEFAULT = FALSE;
return(TRUE);
}
}
return(FALSE);
}
static Int p_execution_mode( USES_REGS1 ) {
Term t;
t = Deref(ARG1);
if (IsVarTerm(t)) {
Term ta;
if (GLOBAL_parallel_execution_mode)
ta = MkAtomTerm(Yap_LookupAtom("parallel"));
else
ta = MkAtomTerm(Yap_LookupAtom("sequential"));
Bind((CELL *)t, ta);
return(TRUE);
}
if (IsAtomTerm(t)) {
char *s;
s = RepAtom(AtomOfTerm(t))->StrOfAE;
if (strcmp(s,"parallel") == 0) {
GLOBAL_parallel_execution_mode = TRUE;
return(TRUE);
}
if (strcmp(s,"sequential") == 0) {
GLOBAL_parallel_execution_mode = FALSE;
return(TRUE);
}
}
return(FALSE);
}
static Int p_performance( USES_REGS1 ) {
Term t;
realtime one_worker_execution_time = 0;
int i;
GLOBAL_performance_mode |= PERFORMANCE_IN_EXECUTION;
t = Deref(ARG1);
if (IsVarTerm(t)) {
Term ta;
if (GLOBAL_performance_mode & PERFORMANCE_ON) {
ta = MkAtomTerm(Yap_LookupAtom("on"));
} else {
ta = MkAtomTerm(Yap_LookupAtom("off"));
}
Bind((CELL *)t, ta);
return(TRUE);
}
if (IsAtomTerm(t)) {
char *s;
s = RepAtom(AtomOfTerm(t))->StrOfAE;
if (strcmp(s, "on") == 0) {
GLOBAL_performance_mode |= PERFORMANCE_ON;
return(TRUE);
}
if (strcmp(s,"off") == 0) {
GLOBAL_performance_mode &= ~PERFORMANCE_ON;
return(TRUE);
}
if (strcmp(s,"clear") == 0) {
GLOBAL_number_goals = 0;
GLOBAL_best_times(0) = 0;
return(TRUE);
}
}
if (IsIntTerm(t))
one_worker_execution_time = IntOfTerm(t);
else if (IsFloatTerm(t))
one_worker_execution_time = FloatOfTerm(t);
else
return(FALSE);
if (GLOBAL_number_goals) {
Sfprintf(Soutput, "[\n Best execution times:\n");
for (i = 1; i <= GLOBAL_number_goals; i++) {
Sfprintf(Soutput, " %d. time: %f seconds", i, GLOBAL_best_times(i));
if (one_worker_execution_time != 0)
Sfprintf(Soutput, " --> speedup %f (%6.2f %% )\n",
one_worker_execution_time / GLOBAL_best_times(i),
one_worker_execution_time / GLOBAL_best_times(i) / GLOBAL_number_workers* 100 );
else Sfprintf(Soutput, "\n");
}
Sfprintf(Soutput, " Average : %f seconds",
GLOBAL_best_times(0) / GLOBAL_number_goals);
if (one_worker_execution_time != 0)
Sfprintf(Soutput, " --> speedup %f (%6.2f %% )",
one_worker_execution_time * GLOBAL_number_goals / GLOBAL_best_times(0),
one_worker_execution_time * GLOBAL_number_goals / GLOBAL_best_times(0) / GLOBAL_number_workers* 100 );
if (GLOBAL_number_goals >= 3) {
Sfprintf(Soutput, "\n Average (best three): %f seconds",
(GLOBAL_best_times(1) + GLOBAL_best_times(2) + GLOBAL_best_times(3)) / 3);
if (one_worker_execution_time != 0)
Sfprintf(Soutput, " --> speedup %f (%6.2f %% ) ]\n\n",
one_worker_execution_time * 3 / (GLOBAL_best_times(1) + GLOBAL_best_times(2) + GLOBAL_best_times(3)),
one_worker_execution_time * 3 / (GLOBAL_best_times(1) + GLOBAL_best_times(2) + GLOBAL_best_times(3)) / GLOBAL_number_workers* 100 );
else Sfprintf(Soutput, "\n]\n\n");
} else Sfprintf(Soutput, "\n]\n\n");
Sflush(Soutput);
return (TRUE);
}
return (FALSE);
}
static Int p_parallel_new_answer( USES_REGS1 ) {
or_fr_ptr leftmost_or_fr;
@ -738,12 +624,6 @@ static Int p_parallel_new_answer( USES_REGS1 ) {
}
static Int p_parallel_yes_answer( USES_REGS1 ) {
GLOBAL_answers = YES_ANSWER;
return (TRUE);
}
static Int p_show_statistics_or( USES_REGS1 ) {
IOSTREAM *out;
long total_bytes = 0, aux_bytes;
@ -978,7 +858,7 @@ static inline int parallel_new_answer_putchar(int sno, int ch) {
static inline void show_answers(void) {
CACHE_REGS
int i;
int i, answers = 0;
if (OrFr_qg_solutions(LOCAL_top_or_fr)) {
qg_ans_fr_ptr aux_answer1, aux_answer2;
aux_answer1 = SolFr_first(OrFr_qg_solutions(LOCAL_top_or_fr));
@ -987,47 +867,25 @@ static inline void show_answers(void) {
aux_answer2 = aux_answer1;
aux_answer1 = AnsFr_next(aux_answer1);
FREE_QG_ANSWER_FRAME(aux_answer2);
GLOBAL_answers++;
answers++;
}
FREE_QG_SOLUTION_FRAME(OrFr_qg_solutions(LOCAL_top_or_fr));
OrFr_qg_solutions(LOCAL_top_or_fr) = NULL;
}
switch(GLOBAL_answers) {
case YES_ANSWER:
Sfprintf(Serror, "[ yes");
break;
case NO_ANSWER:
Sfprintf(Serror, "[ no");
switch(answers) {
case 0:
Sfprintf(Serror, "[ no answers found");
break;
case 1:
Sfprintf(Serror, "[ 1 answer found");
break;
default:
Sfprintf(Serror, "[ %d answers found", GLOBAL_answers);
Sfprintf(Serror, "[ %d answers found", answers);
break;
}
Sfprintf(Serror, " (in %f seconds) ]\n\n", GLOBAL_execution_time);
Sflush(Serror);
if (GLOBAL_performance_mode == PERFORMANCE_ON) {
for (i = GLOBAL_number_goals; i > 0; i--) {
if (GLOBAL_best_times(i) > GLOBAL_execution_time) {
if (i + 1 < MAX_BEST_TIMES)
GLOBAL_best_times(i + 1) = GLOBAL_best_times(i);
else {
GLOBAL_best_times(0) -= GLOBAL_best_times(i);
}
}
else break;
}
if (i + 1 < MAX_BEST_TIMES) {
GLOBAL_best_times(0) += GLOBAL_execution_time;
GLOBAL_best_times(i + 1) = GLOBAL_execution_time;
if (GLOBAL_number_goals + 1 < MAX_BEST_TIMES)
GLOBAL_number_goals++;
}
}
return;
}

View File

@ -177,13 +177,8 @@ struct global_optyap_data {
int master_worker;
#endif /* YAPOR_COW */
/* global data related to or-performance */
realtime execution_time;
realtime best_execution_times[MAX_BEST_TIMES];
int number_of_executed_goals;
char performance_mode; /* PERFORMANCE_OFF / PERFORMANCE_ON / PERFORMANCE_IN_EXECUTION */
/* global data related to or-parallelism */
realtime execution_time;
#ifdef YAPOR_THREADS
Int root_choice_point_offset;
#else
@ -202,8 +197,7 @@ struct global_optyap_data {
#endif /* TABLING_INNER_CUTS */
struct global_optyap_locks locks;
volatile unsigned int branch[MAX_WORKERS][MAX_BRANCH_DEPTH];
volatile char parallel_execution_mode; /* TRUE / FALSE */
volatile int answers;
volatile char parallel_mode; /* PARALLEL_MODE_OFF / PARALLEL_MODE_ON / PARALLEL_MODE_RUNNING */
#endif /* YAPOR */
#ifdef TABLING
@ -249,9 +243,6 @@ struct global_optyap_data {
#define GLOBAL_worker_pid(worker) (GLOBAL_optyap_data.worker_pid[worker])
#define GLOBAL_master_worker (GLOBAL_optyap_data.master_worker)
#define GLOBAL_execution_time (GLOBAL_optyap_data.execution_time)
#define GLOBAL_best_times(time) (GLOBAL_optyap_data.best_execution_times[time])
#define GLOBAL_number_goals (GLOBAL_optyap_data.number_of_executed_goals)
#define GLOBAL_performance_mode (GLOBAL_optyap_data.performance_mode)
#ifdef YAPOR_THREADS
#define Get_GLOBAL_root_cp() offset_to_cptr(GLOBAL_optyap_data.root_choice_point_offset)
#define Set_GLOBAL_root_cp(bptr) (GLOBAL_optyap_data.root_choice_point_offset = cptr_to_offset(bptr))
@ -280,8 +271,7 @@ struct global_optyap_data {
#define GLOBAL_locks_heap_access (GLOBAL_optyap_data.locks.heap_access)
#define GLOBAL_locks_alloc_block (GLOBAL_optyap_data.locks.alloc_block)
#define GLOBAL_branch(worker, depth) (GLOBAL_optyap_data.branch[worker][depth])
#define GLOBAL_parallel_execution_mode (GLOBAL_optyap_data.parallel_execution_mode)
#define GLOBAL_answers (GLOBAL_optyap_data.answers)
#define GLOBAL_parallel_mode (GLOBAL_optyap_data.parallel_mode)
#define GLOBAL_root_gt (GLOBAL_optyap_data.root_global_trie)
#define GLOBAL_root_tab_ent (GLOBAL_optyap_data.root_table_entry)
#define GLOBAL_first_sg_fr (GLOBAL_optyap_data.first_subgoal_frame)

View File

@ -46,7 +46,10 @@ static void share_private_nodes(int worker_q);
#define INCREMENTAL_COPY 1
#if INCREMENTAL_COPY
#define COMPUTE_SEGMENTS_TO_COPY_TO(Q) \
REMOTE_start_global_copy(Q) = (CELL) (REMOTE_top_cp(Q)->cp_h); \
if (REMOTE_top_cp(Q) == GLOBAL_root_cp) \
REMOTE_start_global_copy(Q) = (CELL) (H0); \
else \
REMOTE_start_global_copy(Q) = (CELL) (REMOTE_top_cp(Q)->cp_h); \
REMOTE_end_global_copy(Q) = (CELL) (B->cp_h); \
REMOTE_start_local_copy(Q) = (CELL) (B); \
REMOTE_end_local_copy(Q) = (CELL) (REMOTE_top_cp(Q)); \
@ -102,7 +105,7 @@ void make_root_choice_point(void) {
B = LOCAL_top_cp = GLOBAL_root_cp;
B->cp_tr = TR = ((choiceptr) (worker_offset(0) + (CELL)(B)))->cp_tr;
}
B->cp_h = H0;
// B->cp_h = H0;
B->cp_ap = GETWORK;
B->cp_or_fr = GLOBAL_root_or_fr;
LOCAL_top_or_fr = GLOBAL_root_or_fr;

View File

@ -94,13 +94,13 @@ STD_PROTO(static inline qg_sol_fr_ptr CUT_prune_solution_frames, (qg_sol_fr_ptr,
/* ---------------------------- **
** Performance Macros **
** ---------------------------- */
/* ------------------------------ **
** Parallel Mode Macros **
** ------------------------------ */
#define PERFORMANCE_OFF 0x0
#define PERFORMANCE_ON 0x1
#define PERFORMANCE_IN_EXECUTION 0x2
#define PARALLEL_MODE_OFF 0
#define PARALLEL_MODE_ON 1
#define PARALLEL_MODE_RUNNING 2
@ -177,7 +177,7 @@ STD_PROTO(static inline qg_sol_fr_ptr CUT_prune_solution_frames, (qg_sol_fr_ptr,
}
#define CUT_wait_leftmost() \
if (GLOBAL_parallel_execution_mode) { \
if (GLOBAL_parallel_mode == PARALLEL_MODE_RUNNING) { \
/* parallel execution mode --> wait until leftmost */ \
int i, loop, depth, ltt; \
bitmap members; \

View File

@ -18,6 +18,7 @@
#include "Yap.h"
#if defined(YAPOR_COPY) || defined(YAPOR_COW) || defined(YAPOR_SBA)
#include <signal.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <sys/shm.h>
@ -35,7 +36,8 @@
#define GLOBAL_LOCAL_STRUCTS_AREA ADJUST_SIZE_TO_PAGE(sizeof(struct global_data) + MAX_WORKERS * sizeof(struct worker_local))
#ifdef MMAP_MEMORY_MAPPING_SCHEME
int fd_mapfile;
#define PATH_MAX 1000
char mapfile_path[PATH_MAX];
#elif SHM_MEMORY_MAPPING_SCHEME
int shm_mapid[MAX_WORKERS + 2];
#endif /* MEMORY_MAPPING_SCHEME */
@ -62,10 +64,17 @@ void Yap_init_yapor_global_local_memory(void) {
Yap_global = (struct global_data *)(MMAP_ADDR - sizeof(struct global_data));
#ifdef MMAP_MEMORY_MAPPING_SCHEME
{ char mapfile[20];
strcpy(mapfile,"./mapfile");
itos(getpid(), &mapfile[9]);
if ((fd_mapfile = open(mapfile, O_RDWR|O_CREAT|O_TRUNC, 0666)) < 0)
{ int fd_mapfile;
if (getcwd(mapfile_path,PATH_MAX) == NULL)
Yap_Error(FATAL_ERROR, TermNil, "getcwd error (Yap_init_yapor_global_local_memory)");
strcat(mapfile_path,"/mapfile");
itos(getpid(), &mapfile_path[strlen(mapfile_path)]);
if (strlen(mapfile_path) >= PATH_MAX)
Yap_Error(FATAL_ERROR, TermNil, "PATH_MAX error (Yap_init_yapor_global_local_memory)");
printf("***************** %s\n", mapfile_path);
if ((fd_mapfile = open(mapfile_path, O_RDWR|O_CREAT|O_TRUNC, 0666)) < 0)
Yap_Error(FATAL_ERROR, TermNil, "open error (Yap_init_yapor_global_local_memory)");
if (lseek(fd_mapfile, GLOBAL_LOCAL_STRUCTS_AREA, SEEK_SET) < 0)
Yap_Error(FATAL_ERROR, TermNil, "lseek error (Yap_init_yapor_global_local_memory)");
@ -73,6 +82,8 @@ void Yap_init_yapor_global_local_memory(void) {
Yap_Error(FATAL_ERROR, TermNil, "write error (Yap_init_yapor_global_local_memory)");
if (mmap((void *) Yap_local, (size_t) GLOBAL_LOCAL_STRUCTS_AREA, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_FIXED, fd_mapfile, 0) == (void *) -1)
Yap_Error(FATAL_ERROR, TermNil, "mmap error (Yap_init_global_local_memory)");
if (close(fd_mapfile) == -1)
Yap_Error(FATAL_ERROR, TermNil, "close error (Yap_init_yapor_global_local_memory)");
}
#elif SHM_MEMORY_MAPPING_SCHEME
/* place as segment MAX_WORKERS (0..MAX_WORKERS-1 reserved for worker areas) */
@ -101,12 +112,18 @@ void Yap_init_yapor_stacks_memory(UInt TrailStackArea, UInt HeapStackArea, UInt
#ifdef MMAP_MEMORY_MAPPING_SCHEME
/* map stacks in a single go */
if (lseek(fd_mapfile, GLOBAL_LOCAL_STRUCTS_AREA + StacksArea, SEEK_SET) < 0)
Yap_Error(FATAL_ERROR, TermNil, "lseek error (Yap_init_yapor_stacks_memory)");
if (write(fd_mapfile, "", 1) < 0)
Yap_Error(FATAL_ERROR, TermNil, "write error (Yap_init_yapor_stacks_memory)");
if (mmap((void *) Yap_HeapBase, (size_t) StacksArea, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_FIXED, fd_mapfile, GLOBAL_LOCAL_STRUCTS_AREA) == (void *) -1)
Yap_Error(FATAL_ERROR, TermNil, "mmap error (Yap_init_yapor_stacks_memory)");
{ int fd_mapfile;
if ((fd_mapfile = open(mapfile_path, O_RDWR)) < 0)
Yap_Error(FATAL_ERROR, TermNil, "open error ( Yap_init_yapor_stacks_memory)");
if (lseek(fd_mapfile, GLOBAL_LOCAL_STRUCTS_AREA + StacksArea, SEEK_SET) < 0)
Yap_Error(FATAL_ERROR, TermNil, "lseek error (Yap_init_yapor_stacks_memory)");
if (write(fd_mapfile, "", 1) < 0)
Yap_Error(FATAL_ERROR, TermNil, "write error (Yap_init_yapor_stacks_memory)");
if (mmap((void *) Yap_HeapBase, (size_t) StacksArea, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_FIXED, fd_mapfile, GLOBAL_LOCAL_STRUCTS_AREA) == (void *) -1)
Yap_Error(FATAL_ERROR, TermNil, "mmap error (Yap_init_yapor_stacks_memory)");
if (close(fd_mapfile) == -1)
Yap_Error(FATAL_ERROR, TermNil, "close error (Yap_init_yapor_stacks_memory)");
}
#elif SHM_MEMORY_MAPPING_SCHEME
/* place heap stack segment as MAX_WORKERS+1 */
shm_map_memory(MAX_WORKERS + 1, HeapStackArea, (void *) Yap_HeapBase);
@ -156,13 +173,18 @@ void Yap_remap_yapor_memory(void) {
int i;
void *remap_addr = LOCAL_GlobalBase;
#ifdef MMAP_MEMORY_MAPPING_SCHEME
int fd_mapfile;
long remap_offset = (ADDR) remap_addr - (ADDR) Yap_local;
if ((fd_mapfile = open(mapfile_path, O_RDWR)) < 0)
Yap_Error(FATAL_ERROR, TermNil, "open error (Yap_remap_yapor_memory)");
if (munmap(remap_addr, (size_t)(Yap_worker_area_size * GLOBAL_number_workers)) == -1)
Yap_Error(FATAL_ERROR, TermNil, "munmap error (Yap_remap_yapor_memory)");
for (i = 0; i < GLOBAL_number_workers; i++)
if (mmap(remap_addr + worker_offset(i), (size_t)Yap_worker_area_size, PROT_READ|PROT_WRITE,
MAP_SHARED|MAP_FIXED, fd_mapfile, remap_offset + i * Yap_worker_area_size) == (void *) -1)
Yap_Error(FATAL_ERROR, TermNil, "mmap error (Yap_remap_yapor_memory)");
if (close(fd_mapfile) == -1)
Yap_Error(FATAL_ERROR, TermNil, "close error (Yap_remap_yapor_memory)");
#else /* SHM_MEMORY_MAPPING_SCHEME */
for (i = 0; i < GLOBAL_number_workers; i++)
if (shmdt(remap_addr + Yap_worker_area_size * i) == -1)
@ -202,18 +224,10 @@ void Yap_unmap_yapor_memory (void) {
#endif /* YAPOR_COW */
#ifdef MMAP_MEMORY_MAPPING_SCHEME
{ char mapfile[20];
strcpy(mapfile,"./mapfile");
#if defined(YAPOR_COPY) || defined(YAPOR_SBA)
itos(GLOBAL_worker_pid(0), &mapfile[9]);
#elif defined(YAPOR_COW)
itos(GLOBAL_master_worker, &mapfile[9]);
#endif /* YAPOR_COPY || YAPOR_SBA || YAPOR_COW */
if (remove(mapfile) == 0)
INFORMATION_MESSAGE("Removing mapfile \"%s\"", mapfile);
else
INFORMATION_MESSAGE("Can't remove mapfile \"%s\"", mapfile);
}
if (remove(mapfile_path) == 0)
INFORMATION_MESSAGE("Removing mapfile \"%s\"", mapfile_path);
else
INFORMATION_MESSAGE("Can't remove mapfile \"%s\"", mapfile_path);
#elif SHM_MEMORY_MAPPING_SCHEME
#if defined(YAPOR_COPY) || defined(YAPOR_SBA)
shm_unmap_memory(MAX_WORKERS);

View File

@ -442,7 +442,7 @@
find_leader_node(leader_cp, leader_dep_on_stack);
store_consumer_node(tab_ent, sg_fr, leader_cp, leader_dep_on_stack);
#ifdef DEBUG_OPTYAP
if (GLOBAL_parallel_execution_mode) {
if (GLOBAL_parallel_mode == PARALLEL_MODE_RUNNING) {
choiceptr aux_cp;
aux_cp = B;
while (YOUNGER_CP(aux_cp, Get_LOCAL_top_cp_on_stack()))
@ -553,7 +553,7 @@
find_leader_node(leader_cp, leader_dep_on_stack);
store_consumer_node(tab_ent, sg_fr, leader_cp, leader_dep_on_stack);
#ifdef DEBUG_OPTYAP
if (GLOBAL_parallel_execution_mode) {
if (GLOBAL_parallel_mode == PARALLEL_MODE_RUNNING) {
choiceptr aux_cp;
aux_cp = B;
while (YOUNGER_CP(aux_cp, Get_LOCAL_top_cp_on_stack()))
@ -664,7 +664,7 @@
find_leader_node(leader_cp, leader_dep_on_stack);
store_consumer_node(tab_ent, sg_fr, leader_cp, leader_dep_on_stack);
#ifdef DEBUG_OPTYAP
if (GLOBAL_parallel_execution_mode) {
if (GLOBAL_parallel_mode == PARALLEL_MODE_RUNNING) {
choiceptr aux_cp;
aux_cp = B;
while (YOUNGER_CP(aux_cp, Get_LOCAL_top_cp_on_stack()))
@ -1213,7 +1213,7 @@
}
#endif /* YAPOR */
#ifdef DEBUG_OPTYAP
if (GLOBAL_parallel_execution_mode) {
if (GLOBAL_parallel_mode == PARALLEL_MODE_RUNNING) {
choiceptr aux_cp;
OPTYAP_ERROR_CHECKING(completion, YOUNGER_CP(Get_LOCAL_top_cp(), Get_LOCAL_top_cp_on_stack()));
aux_cp = chain_cp;
@ -1284,7 +1284,7 @@
}
#endif /* YAPOR */
#ifdef DEBUG_OPTYAP
if (GLOBAL_parallel_execution_mode) {
if (GLOBAL_parallel_mode == PARALLEL_MODE_RUNNING) {
choiceptr aux_cp;
OPTYAP_ERROR_CHECKING(completion, YOUNGER_CP(Get_LOCAL_top_cp(), Get_LOCAL_top_cp_on_stack()));
aux_cp = chain_cp;
@ -1409,7 +1409,7 @@
UNLOCK(DepFr_lock(dep_fr));
#ifdef DEBUG_OPTYAP
if (GLOBAL_parallel_execution_mode) {
if (GLOBAL_parallel_mode == PARALLEL_MODE_RUNNING) {
choiceptr aux_cp;
OPTYAP_ERROR_CHECKING(completion, Get_LOCAL_top_cp(), Get_LOCAL_top_cp_on_stack());
aux_cp = DepFr_cons_cp(dep_fr);
@ -1434,7 +1434,7 @@
}
#endif /* YAPOR */
#ifdef DEBUG_OPTYAP
if (GLOBAL_parallel_execution_mode) {
if (GLOBAL_parallel_mode == PARALLEL_MODE_RUNNING) {
choiceptr aux_cp;
OPTYAP_ERROR_CHECKING(completion, YOUNGER_CP(Get_LOCAL_top_cp(), Get_LOCAL_top_cp_on_stack()));
aux_cp = DepFr_cons_cp(dep_fr);

View File

@ -653,7 +653,7 @@ static inline void abolish_incomplete_subgoals(choiceptr prune_cp) {
if (EQUAL_OR_YOUNGER_CP(DepFr_cons_cp(LOCAL_top_dep_fr), prune_cp)) {
#ifdef YAPOR
if (GLOBAL_parallel_execution_mode)
if (GLOBAL_parallel_mode == PARALLEL_MODE_RUNNING)
pruning_over_tabling_data_structures();
#endif /* YAPOR */
do {
@ -667,7 +667,7 @@ static inline void abolish_incomplete_subgoals(choiceptr prune_cp) {
while (LOCAL_top_sg_fr && EQUAL_OR_YOUNGER_CP(SgFr_gen_cp(LOCAL_top_sg_fr), prune_cp)) {
sg_fr_ptr sg_fr;
#ifdef YAPOR
if (GLOBAL_parallel_execution_mode)
if (GLOBAL_parallel_mode == PARALLEL_MODE_RUNNING)
pruning_over_tabling_data_structures();
#endif /* YAPOR */
sg_fr = LOCAL_top_sg_fr;

View File

@ -121,7 +121,7 @@ true :- true.
nb_setval('$included_file',[]).
'$init_or_threads' :-
'$c_yapor_threads'(W), !,
'$c_yapor_workers'(W), !,
'$start_orp_threads'(W).
'$init_or_threads'.
@ -530,20 +530,6 @@ true :- true.
'$query'(end_of_file,_).
% ***************************
% * -------- YAPOR -------- *
% ***************************
'$query'(G,V) :-
\+ '$undefined'('$c_yapor_on', prolog),
'$c_yapor_on',
\+ '$undefined'('$c_start_yapor', prolog),
'$parallelizable'(G), !,
'$parallel_query'(G,V),
fail.
% end of YAPOR
'$query'(G,[]) :-
'$prompt_alternatives_on'(OPT),
( OPT = groundness ; OPT = determinism), !,

View File

@ -45,7 +45,6 @@
'$directive'(module_transparent(_)).
'$directive'(multifile(_)).
'$directive'(noprofile(_)).
'$directive'(parallel).
'$directive'(public(_)).
'$directive'(op(_,_,_)).
'$directive'(require(_)).
@ -53,8 +52,6 @@
'$directive'(reconsult(_)).
'$directive'(reexport(_)).
'$directive'(reexport(_,_)).
'$directive'(sequential).
'$directive'(sequential(_)).
'$directive'(thread_initialization(_)).
'$directive'(thread_local(_)).
'$directive'(uncutable(_)).
@ -85,14 +82,6 @@
'$expects_dialect'(D).
'$exec_directive'(encoding(Enc), _, _) :-
'$set_encoding'(Enc).
'$exec_directive'(parallel, _, _) :-
'$parallel'.
'$exec_directive'(sequential, _, _) :-
'$sequential'.
'$exec_directive'(sequential(G), _, M) :-
'$sequential_directive'(G, M).
'$exec_directive'(parallel(G), _, M) :-
'$parallel_directive'(G, M).
'$exec_directive'(include(F), Status, _) :-
'$include'(F, Status).
'$exec_directive'(module(N,P), Status, _) :-

View File

@ -566,7 +566,7 @@ yap_flag(system_options,X) :-
'$system_options'(low_level_tracer) :-
\+ '$undefined'(start_low_level_trace, prolog).
'$system_options'(or_parallelism) :-
\+ '$undefined'('$c_yapor_on', prolog).
\+ '$undefined'('$c_yapor_start', prolog).
'$system_options'(rational_trees) :-
'$yap_has_rational_trees'.
'$system_options'(readline) :-

View File

@ -128,8 +128,6 @@ yap_hacks:cut_by(CP) :- '$$cut_by'(CP).
:- '$change_type_of_char'(36,7). % Make $ a symbol character
:- default_sequential(off).
:- multifile user:library_directory/1.
:- dynamic user:library_directory/1.

View File

@ -12,7 +12,9 @@
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
:- meta_predicate
default_sequential(:).
parallel(:),
parallel_findall(?,:,?).
parallel_once(:).
@ -88,156 +90,80 @@ opt_statistics(table_subgoal_answer_frames,[BytesInUse,StructsInUse]) :-
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% default_sequential/1 %%
%% parallel/1 %%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
default_sequential(X) :-
'$c_default_sequential'(X), !.
default_sequential(_).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% $parallel_query/2 %%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
'$parallel_query'(G,[]) :- !,
'$c_start_yapor',
'$execute'(G), !,
'$c_parallel_yes_answer'.
'$parallel_query'(G,V) :-
'$c_start_yapor',
'$execute'(G),
'$c_parallel_new_answer'(V).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% $sequential/0 %%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
'$sequential' :-
'$c_default_sequential'(X),
'$initialization'('$c_default_sequential'(X)),
'$c_default_sequential'(on).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% $parallel/0 %%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
'$parallel' :-
'$c_default_sequential'(X),
'$initialization'('$c_default_sequential'(X)),
'$c_default_sequential'(off).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% $sequential_directive/2 %%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
'$sequential_directive'(Pred,Mod) :-
var(Pred), !,
'$do_error'(instantiation_error,sequential(Mod:Pred)).
'$sequential_directive'(Mod:Pred,_) :- !,
'$sequential_directive'(Pred,Mod).
'$sequential_directive'((Pred1,Pred2),Mod) :- !,
'$sequential_directive'(Pred1,Mod),
'$sequential_directive'(Pred2,Mod).
'$sequential_directive'(PredName/PredArity,Mod) :-
atom(PredName), integer(PredArity),
functor(PredFunctor,PredName,PredArity), !,
'$flags'(PredFunctor,Mod,Flags,Flags),
parallel(Goal) :-
parallel_mode(Mode), Mode = on, !,
(
Flags /\ 0x1991F880 =:= 0, !,
(
Flags /\ 0x00000020 =\= 0, !,
write(user_error, '[ Warning: '),
write(user_error, Mod:PredName/PredArity),
write(user_error, ' is already declared as sequential ]'),
nl(user_error)
;
NewFlags is Flags \/ 0x00000020,
'$flags'(PredFunctor,Mod,Flags,NewFlags)
)
'$parallel_query'(Goal)
;
write(user_error, '[ Error: '),
write(user_error, Mod:PredName/PredArity),
write(user_error, ' cannot be declared as sequential ]'),
nl(user_error),
fail
true
).
'$sequential_directive'(Pred,Mod) :-
'$do_error'(type_error(callable,Mod:Pred),sequential(Mod:Pred)).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% $parallel_directive/2 %%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
'$parallel_directive'(Pred,Mod) :-
var(Pred), !,
'$do_error'(instantiation_error,parallel(Mod:Pred)).
'$parallel_directive'((Pred1,Pred2),Mod) :- !,
'$parallel_directive'(Pred1,Mod),
'$parallel_directive'(Pred2,Mod).
'$parallel_directive'(Mod:Pred,_) :- !,
'$parallel_directive'(Pred,Mod).
'$parallel_directive'(PredName/PredArity,Mod) :-
atom(PredName), integer(PredArity),
functor(PredFunctor,PredName,PredArity), !,
'$flags'(PredFunctor,Mod,Flags,Flags),
parallel(Goal) :-
(
Flags /\ 0x1991F880 =:= 0, !,
(
Flags /\ 0x00000020 =:= 0, !,
write(user_error, '[ Warning: '),
write(user_error, Mod:PredName/PredArity),
write(user_error, ' is already declared as parallel ]'),
nl(user_error)
;
NewFlags is Flags /\ 0xffffffdf,
'$flags'(PredFunctor,Mod,Flags,NewFlags)
)
'$execute'(Goal),
fail
;
write(user_error, '[ Error: '),
write(user_error, Mod:PredName/PredArity),
write(user_error, ' cannot be declared as parallel ]'),
nl(user_error),
fail
true
).
'$parallel_directive'(Pred,Mod) :-
'$do_error'(type_error(callable,Mod:Pred),parallel(Mod:Pred)).
'$parallel_query'(Goal) :-
'$c_yapor_start',
'$execute'(Goal),
fail.
'$parallel_query'(_).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% $parallelizable/1 %%
%% parallel_findall/3 %%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
'$parallelizable'(_) :-
nb_getval('$consulting_file',S), S\=[], !, fail.
'$parallelizable'((G1,G2)) :- !,
'$parallelizable'(G1),
'$parallelizable'(G2).
'$parallelizable'((G1;G2)) :- !,
'$parallelizable'(G1),
'$parallelizable'(G2).
'$parallelizable'((G1|G2)) :- !,
'$parallelizable'(G1),
'$parallelizable'(G2).
'$parallelizable'((G1->G2)) :- !,
'$parallelizable'(G1),
'$parallelizable'(G2).
'$parallelizable'([]) :- !, fail.
'$parallelizable'([_|_]) :- !, fail.
'$parallelizable'(consult(_)) :- !, fail.
'$parallelizable'(reconsult(_)) :- !, fail.
'$parallelizable'(compile(_)) :- !, fail.
'$parallelizable'(use_module(_)) :- !, fail.
'$parallelizable'(_).
parallel_findall(Template,Goal,Answers) :-
parallel_mode(Mode), Mode = on, !,
(
'$parallel_findall_query'(Template,Goal)
;
findall(X,'$parallel_findall_recorded'(X), Answers)
).
parallel_findall(Template,Goal,Answers) :-
findall(Template,Goal,Answers).
'$parallel_findall_query'(Template,Goal) :-
'$c_yapor_start',
'$execute'(Goal),
recordz(parallel_findall,Template,_),
%% '$c_parallel_new_answer'(Ref),
fail.
'$parallel_findall_query'(_,_).
'$parallel_findall_recorded'(Template) :-
recorded(parallel_findall,Template,Ref),
erase(Ref).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% parallel_once/1 %%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
parallel_once(Goal) :-
parallel_mode(Mode), Mode = on, !,
(
'$parallel_once_query'(Goal)
;
recorded(parallel_once,Goal,Ref),
erase(Ref)
).
parallel_once(Goal) :-
once(Goal).
'$parallel_once_query'(Goal) :-
'$c_yapor_start',
'$execute'(Goal), !,
recordz(parallel_once,Goal,_),
fail.
'$parallel_once_query'(_).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%