This repository has been archived on 2023-08-20. You can view files and clone it, but cannot push or open issues or pull requests.
yap-6.3/packages/pyswip/pyswip/easy.py
2010-06-01 00:33:32 +01:00

477 lines
13 KiB
Python

# -*- coding: utf-8 -*-
# pyswip.easy -- PySWIP helper functions
# (c) 2006-2007 Yüce TEKOL
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
from pyswip.core import *
class InvalidTypeError(TypeError):
def __init__(self, *args):
type = args and args[0] or "Unknown"
msg = "Term is expected to be of type: '%s'" % type
Exception.__init__(self, msg, *args)
class Atom(object):
__slots__ = "handle","chars"
def __init__(self, handleOrChars):
"""Create an atom.
``handleOrChars``: handle or string of the atom.
"""
if isinstance(handleOrChars, basestring):
self.handle = PL_new_atom(handleOrChars)
self.chars = handleOrChars
else:
self.handle = handleOrChars
PL_register_atom(self.handle)
self.chars = c_char_p(PL_atom_chars(self.handle)).value
def fromTerm(cls, term):
"""Create an atom from a Term or term handle."""
if isinstance(term, Term):
term = term.handle
a = atom_t()
if PL_get_atom(term, addressof(a)):
return cls(a.value)
fromTerm = classmethod(fromTerm)
def __del__(self):
PL_unregister_atom(self.handle)
value = property(lambda s:s.chars)
def __str__(self):
if self.chars is not None:
return self.chars
else:
return self.__repr__()
def __repr__(self):
return str(self.handle).join(["Atom('", "')"])
class Term(object):
__slots__ = "handle","chars","__value","a0"
def __init__(self, handle=None, a0=None):
if handle:
#self.handle = PL_copy_term_ref(handle)
self.handle = handle
else:
self.handle = PL_new_term_ref()
self.chars = None
self.a0 = a0
def __invert__(self):
return _not(self)
def get_value(self):
pass
class Variable(object):
__slots__ = "handle","chars"
def __init__(self, handle=None, name=None):
self.chars = None
if name:
self.chars = name
if handle:
self.handle = handle
s = create_string_buffer("\00"*64) # FIXME:
ptr = cast(s, c_char_p)
if PL_get_chars(handle, byref(ptr), CVT_VARIABLE|BUF_RING):
self.chars = ptr.value
else:
self.handle = PL_new_term_ref()
#PL_put_variable(self.handle)
def unify(self, value):
if type(value) == str:
fun = PL_unify_atom_chars
elif type(value) == int:
fun = PL_unify_integer
elif type(value) == bool:
fun = PL_unify_bool
elif type(value) == float:
fun = PL_unify_float
elif type(value) == list:
fun = PL_unify_list
else:
raise
t = PL_new_term_ref()
fun(self.handle, value)
def get_value(self):
return getTerm(self.handle)
value = property(get_value, unify)
def unified(self):
return PL_term_type(self.handle) == PL_VARIABLE
def __str__(self):
if self.chars is not None:
return self.chars
else:
return self.__repr__()
def __repr__(self):
return "Variable(%s)" % self.handle
def put(self, term):
#PL_put_variable(term)
self.handle = term
class Functor(object):
__slots__ = "handle","name","arity","args","__value","a0"
func = {}
def __init__(self, handleOrName, arity=1, args=None, a0=None):
"""Create a functor.
``handleOrName``: functor handle, a string or an atom.
"""
self.args = args or []
self.arity = arity
self.a0 = a0
if isinstance(handleOrName, basestring):
self.name = Atom(handleOrName)
self.handle = PL_new_functor(self.name.handle, arity)
self.__value = "Functor%d" % self.handle
elif isinstance(handleOrName, Atom):
self.name = handleOrName
self.handle = PL_new_functor(self.name.handle, arity)
self.__value = "Functor%d" % self.handle
else:
self.handle = handleOrName
self.name = Atom(PL_functor_name(self.handle))
self.arity = PL_functor_arity(self.handle)
try:
self.__value = self.func[self.handle](self.arity, *self.args)
except KeyError:
self.__value = "Functor%d" % self.handle
def fromTerm(cls, term):
"""Create a functor from a Term or term handle."""
if isinstance(term, Term):
term = term.handle
f = functor_t()
if PL_get_functor(term, byref(f)):
# get args
args = []
arity = PL_functor_arity(f.value)
# let's have all args be consecutive
a0 = PL_new_term_refs(arity)
for i, a in enumerate(range(1, arity + 1)):
if PL_get_arg(a, term, a0 + i):
args.append(getTerm(a0 + i))
return cls(f.value, args=args, a0=a0)
fromTerm = classmethod(fromTerm)
value = property(lambda s: s.__value)
def __call__(self, *args):
assert self.arity == len(args)
a = PL_new_term_refs(len(args))
for i, arg in enumerate(args):
putTerm(a + i, arg)
t = PL_new_term_ref()
PL_cons_functor_v(t, self.handle, a)
return Term(t)
def __str__(self):
if self.name is not None and self.arity is not None:
return "%s(%d)" % (self.name,self.arity)
else:
return self.__repr__()
def __repr__(self):
return "".join(["Functor(", ",".join(str(x) for x in [self.handle,self.arity]+self.args), ")"])
def _unifier(arity, *args):
assert arity == 2
#if PL_is_variable(args[0]):
# args[0].unify(args[1])
try:
return {args[0].chars:args[1].value}
except AttributeError:
return {args[0].chars:args[1]}
_unify = Functor("=", 2)
Functor.func[_unify.handle] = _unifier
_not = Functor("not", 1)
_comma = Functor(",", 2)
def putTerm(term, value):
if isinstance(value, Term):
PL_put_term(term, value.handle)
elif isinstance(value, basestring):
PL_put_atom_chars(term, value)
elif isinstance(value, int):
PL_put_integer(term, value)
elif isinstance(value, Variable):
value.put(term)
elif isinstance(value, list):
putList(term, value)
elif isinstance(value, Atom):
print "ATOM"
elif isinstance(value, Functor):
PL_put_functor(term, value.handle)
else:
raise Exception("Not implemented")
def putList(l, ls):
PL_put_nil(l)
a = PL_new_term_ref() #PL_new_term_refs(len(ls))
for item in reversed(ls):
putTerm(a, item)
PL_cons_list(l, a, l)
#PL_get_head(h, h)
# deprecated
def getAtomChars(t):
"""If t is an atom, return it as a string, otherwise raise InvalidTypeError.
"""
s = c_char_p()
if PL_get_atom_chars(t, addressof(s)):
return s.value
else:
raise InvalidTypeError("atom")
def getAtom(t):
"""If t is an atom, return it , otherwise raise InvalidTypeError.
"""
return Atom.fromTerm(t)
def getBool(t):
"""If t is of type bool, return it, otherwise raise InvalidTypeError.
"""
b = c_long()
if PL_get_long(t, byref(b)):
return bool(b.value)
else:
raise InvalidTypeError("bool")
def getLong(t):
"""If t is of type long, return it, otherwise raise InvalidTypeError.
"""
i = c_long()
if PL_get_long(t, byref(i)):
return i.value
else:
raise InvalidTypeError("long")
getInteger = getLong # just an alias for getLong
def getFloat(t):
"""If t is of type float, return it, otherwise raise InvalidTypeError.
"""
d = c_double()
if PL_get_float(t, byref(d)):
return d.value
else:
raise InvalidTypeError("float")
def getString(t):
"""If t is of type string, return it, otherwise raise InvalidTypeError.
"""
slen = c_int()
s = c_char_p()
if PL_get_string_chars(t, byref(s), byref(slen)):
return s.value
else:
raise InvalidTypeError("string")
def getTerm(t):
p = PL_term_type(t)
if p < PL_TERM:
return _getterm_router[p](t)
elif PL_is_list(t):
return getList(t)
else:
return getFunctor(t)
def getList(x):
"""Return t as a list.
"""
t = PL_copy_term_ref(x)
head = PL_new_term_ref()
result = []
while PL_get_list(t, head, t):
result.append(getTerm(head))
return result
def getFunctor(t):
"""Return t as a functor
"""
return Functor.fromTerm(t)
def getVariable(t):
return Variable(t)
_getterm_router = {
PL_VARIABLE:getVariable, PL_ATOM:getAtom, PL_STRING:getString,
PL_INTEGER:getInteger, PL_FLOAT:getFloat,
PL_TERM:getTerm
}
def _callbackWrapper(arity=1):
return CFUNCTYPE(*([foreign_t] + [term_t]*arity))
def _foreignWrapper(fun):
def wrapper(*args):
args = [getTerm(arg) for arg in args]
r = fun(*args)
return (r is None) and True or r
return wrapper
def registerForeign(func, name=None, arity=None, flags=0):
"""Register a Python predicate
``func``: Function to be registered. The function should return a value in
``foreign_t``, ``True`` or ``False``.
``name`` : Name of the function. If this value is not used, ``func.func_name``
should exist.
``arity``: Arity (number of arguments) of the function. If this value is not
used, ``func.arity`` should exist.
"""
if arity is None:
arity = func.arity
if name is None:
name = func.func_name
return PL_register_foreign(name, arity,
cast(_callbackWrapper(arity)(_foreignWrapper(func)),c_void_p), (flags))
newTermRef = PL_new_term_ref
def newTermRefs(count):
a = PL_new_term_refs(count)
return range(a, a + count)
def call(*terms, **kwargs):
"""Call term in module.
``term``: a Term or term handle
"""
for kwarg in kwargs:
if kwarg not in ["module"]:
raise KeyError
module = kwargs.get("module", None)
t = terms[0]
for tx in terms[1:]:
t = _comma(t, tx)
return PL_call(t.handle, module)
def newModule(name):
"""Create a new module.
``name``: An Atom or a string
"""
if isinstance(name, basestring):
name = Atom(name)
return PL_new_module(name.handle)
class Query(object):
qid = None
fid = None
def __init__(self, *terms, **kwargs):
for key in kwargs:
if key not in ["flags", "module"]:
raise Exception("Invalid kwarg: %s" % key, key)
flags = kwargs.get("flags", PL_Q_NODEBUG|PL_Q_CATCH_EXCEPTION)
module = kwargs.get("module", None)
t = terms[0]
for tx in terms[1:]:
t = _comma(t, tx)
f = Functor.fromTerm(t)
p = PL_pred(f.handle, module)
Query.fid = PL_open_foreign_frame()
Query.qid = PL_open_query(module, flags, p, f.a0)
# def __del__(self):
# self.closeQuery()
def nextSolution():
return PL_next_solution(Query.qid)
nextSolution = staticmethod(nextSolution)
def cutQuery():
PL_cut_query(Query.qid)
cutQuery = staticmethod(cutQuery)
def closeQuery():
if Query.qid is not None:
PL_close_query(Query.qid)
PL_discard_foreign_frame(Query.fid)
Query.qid = None
Query.fid = None
closeQuery = staticmethod(closeQuery)
def _test():
#from pyswip.prolog import Prolog
#p = Prolog()
#p = _prolog
assertz = Functor("assertz")
a = Functor("a_")
b = Functor("b_")
call(assertz(a(10)))
call(assertz(a([1,2,3])))
call(assertz(a(11)))
call(assertz(b(11)))
call(assertz(b(12)))
X = Variable()
#q = Query(a(X), ~b(X))
#q = Query(b(X), a(X))
#while q.nextSolution():
# print X.value
#print call(a(X),b(X))
#print call(_comma(~a(X),a(X)))
#q = Query(_comma(a(X), b(X)))
q = Query(a(X))
while q.nextSolution():
print ">", X.value
if __name__ == "__main__":
_test()