:- object(buffer).
:- threaded.
:- public([put/1, get/1]).
:- private(item/1).
:- dynamic(item/1).
put(N) :-
( N > 0 % wait until the previous item is consumed
-> NP is N - 1, threaded_wait(consumed(NP)) % (except for the first item!)
; true
),
assertz(item(N)),
sender(Sender),
writeq(Sender), write(' wrote item '), write(N), nl,
threaded_notify(produced(N)). % notify consumer that a new item is available
get(N) :-
threaded_wait(produced(N)), % wait until an item is available
retract(item(N)),
writeq(Sender), write(' read item '), write(N), nl,
threaded_notify(consumed(N)). % notify producer that the item was consumed
:- end_object.
:- object(producer).
:- public(run/1).
run(N) :-
run(0, N).
run(N, N) :- !.
run(M, N) :-
M < N,
random::random(1, 5, Random), % simulate a variable amount of
thread_sleep(Random), % time to produce a new item
buffer::put(M),
M2 is M + 1,
run(M2, N).
:- object(consumer).
thread_sleep(Random), % time to consume an item
buffer::get(M),