513 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			513 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
# -*- coding: utf-8 -*-
 | 
						|
 | 
						|
# pyswip.easy -- PySWIP helper functions
 | 
						|
# (c) 2006-2007 Yüce TEKOL
 | 
						|
# This program is free software; you can redistribute it and/or modify
 | 
						|
# it under the terms of the GNU General Public License as published by
 | 
						|
# the Free Software Foundation; either version 2 of the License, or
 | 
						|
# (at your option) any later version.
 | 
						|
#
 | 
						|
# This program is distributed in the hope that it will be useful,
 | 
						|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
# GNU General Public License for more details.
 | 
						|
#
 | 
						|
# You should have received a copy of the GNU General Public License
 | 
						|
# along with this program; if not, write to the Free Software
 | 
						|
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | 
						|
# MA 02110-1301, USA.
 | 
						|
 | 
						|
from pyswip.core import *
 | 
						|
 | 
						|
 | 
						|
class InvalidTypeError(TypeError):
 | 
						|
    def __init__(self, *args):
 | 
						|
        # type: (object) -> object
 | 
						|
        type = args and args[0] or "Unknown"
 | 
						|
        msg = "Term is expected to be of type: '%s'" % type
 | 
						|
        Exception.__init__(self, msg, *args)
 | 
						|
 | 
						|
 | 
						|
class Atom(object):
 | 
						|
    __slots__ = "handle","chars"
 | 
						|
 | 
						|
    def __init__(self, handleOrChars):
 | 
						|
        # type: (object) -> object
 | 
						|
        """Create an atom.
 | 
						|
        ``handleOrChars``: handle or string of the atom.
 | 
						|
        """
 | 
						|
        if isinstance(handleOrChars, basestring):
 | 
						|
            self.handle = PL_new_atom(handleOrChars)
 | 
						|
            self.chars = handleOrChars
 | 
						|
        else:
 | 
						|
            self.handle = handleOrChars
 | 
						|
            PL_register_atom(self.handle)
 | 
						|
            #self.chars = c_char_p(PL_atom_chars(self.handle)).value
 | 
						|
            self.chars = PL_atom_chars(self.handle)
 | 
						|
 | 
						|
    def fromTerm(cls, term):
 | 
						|
        """Create an atom from a Term or term handle."""
 | 
						|
        if isinstance(term, Term):
 | 
						|
            term = term.handle
 | 
						|
 | 
						|
        a = atom_t()
 | 
						|
        if PL_get_atom(term, byref(a)):
 | 
						|
            return cls(a.value)
 | 
						|
 | 
						|
    fromTerm = classmethod(fromTerm)
 | 
						|
 | 
						|
    def __del__(self):
 | 
						|
        PL_unregister_atom(self.handle)
 | 
						|
 | 
						|
    value = property(lambda s:s.chars)
 | 
						|
 | 
						|
    def __str__(self):
 | 
						|
        if self.chars is not None:
 | 
						|
            return self.chars
 | 
						|
        else:
 | 
						|
            return self.__repr__()
 | 
						|
 | 
						|
    def __repr__(self):
 | 
						|
        return str(self.handle).join(["Atom('", "')"])
 | 
						|
 | 
						|
class Term(object):
 | 
						|
    __slots__ = "handle","chars","__value","a0"
 | 
						|
    def __init__(self, handle=None, a0=None):
 | 
						|
        # type: (object, object) -> object
 | 
						|
        if handle:
 | 
						|
            #self.handle = PL_copy_term_ref(handle)
 | 
						|
            self.handle = handle
 | 
						|
        else:
 | 
						|
            self.handle = PL_new_term_ref()
 | 
						|
        self.chars = None
 | 
						|
        self.a0 = a0
 | 
						|
 | 
						|
    def __invert__(self):
 | 
						|
        return _not(self)
 | 
						|
 | 
						|
    def get_value(self):
 | 
						|
        pass
 | 
						|
 | 
						|
class Variable(object):
 | 
						|
    __slots__ = "handle","chars"
 | 
						|
 | 
						|
    def __init__(self, handle=None, name=None):
 | 
						|
        # type: (object, object) -> object
 | 
						|
        self.chars = None
 | 
						|
        if name:
 | 
						|
            self.chars = name
 | 
						|
        if handle:
 | 
						|
            self.handle = handle
 | 
						|
            s = create_string_buffer("\00"*64)  # FIXME:
 | 
						|
            ptr = cast(s, c_char_p)
 | 
						|
            if PL_get_chars(handle, byref(ptr), CVT_VARIABLE|BUF_RING):
 | 
						|
                self.chars = ptr.value
 | 
						|
        else:
 | 
						|
            self.handle = PL_new_term_ref()
 | 
						|
            #PL_put_variable(self.handle)
 | 
						|
 | 
						|
    def unify(self, value):
 | 
						|
        if type(value) == str:
 | 
						|
            fun = PL_unify_atom_chars
 | 
						|
        elif type(value) == int:
 | 
						|
            fun = PL_unify_integer
 | 
						|
        elif type(value) == bool:
 | 
						|
            fun = PL_unify_bool
 | 
						|
        elif type(value) == float:
 | 
						|
            fun = PL_unify_float
 | 
						|
        elif type(value) == list:
 | 
						|
            fun = PL_unify_list
 | 
						|
        else:
 | 
						|
            raise
 | 
						|
 | 
						|
        if (self.handle == None):
 | 
						|
            t = PL_new_term_ref()
 | 
						|
            self.handle = t
 | 
						|
        fun(self.handle, value)
 | 
						|
 | 
						|
    def get_value(self):
 | 
						|
        return getTerm(self.handle)
 | 
						|
 | 
						|
    value = property(get_value, unify)
 | 
						|
 | 
						|
    def unified(self):
 | 
						|
        return PL_term_type(self.handle) == PL_VARIABLE
 | 
						|
 | 
						|
    def __str__(self):
 | 
						|
        if self.chars is not None:
 | 
						|
            return self.chars
 | 
						|
        else:
 | 
						|
            return self.__repr__()
 | 
						|
 | 
						|
    def __repr__(self):
 | 
						|
        return "Variable(%s)" % self.handle
 | 
						|
 | 
						|
    def put(self, term):
 | 
						|
        #PL_put_variable(term)
 | 
						|
        self.handle = term
 | 
						|
 | 
						|
class Functor(object):
 | 
						|
    __slots__ = "handle","name","arity","args","__value","a0"
 | 
						|
    func = {}
 | 
						|
 | 
						|
    def __init__(self, handleOrName, arity=1, args=None, a0=None):
 | 
						|
        # type: (object, object, object, object) -> object
 | 
						|
        """Create a functor.
 | 
						|
        ``handleOrName``: functor handle, a string or an atom.
 | 
						|
        """
 | 
						|
 | 
						|
        self.args = args or []
 | 
						|
        self.arity = arity
 | 
						|
        self.a0 = a0
 | 
						|
 | 
						|
        if isinstance(handleOrName, basestring):
 | 
						|
            self.name = Atom(handleOrName)
 | 
						|
            self.handle = PL_new_functor(self.name.handle, arity)
 | 
						|
            self.__value = "Functor%d" % self.handle
 | 
						|
        elif isinstance(handleOrName, Atom):
 | 
						|
            self.name = handleOrName
 | 
						|
            self.handle = PL_new_functor(self.name.handle, arity)
 | 
						|
            self.__value = "Functor%d" % self.handle
 | 
						|
        else:
 | 
						|
            self.handle = handleOrName
 | 
						|
            self.name = Atom(PL_functor_name(self.handle))
 | 
						|
            self.arity = PL_functor_arity(self.handle)
 | 
						|
            try:
 | 
						|
                self.__value = self.func[self.handle](self.arity, *self.args)
 | 
						|
            except KeyError:
 | 
						|
                self.__value = "Functor%d" % self.handle
 | 
						|
 | 
						|
    def fromTerm(cls, term):
 | 
						|
        """Create a functor from a Term or term handle."""
 | 
						|
        if isinstance(term, Term):
 | 
						|
            term = term.handle
 | 
						|
 | 
						|
        f = functor_t()
 | 
						|
        if PL_get_functor(term, byref(f)):
 | 
						|
            # get args
 | 
						|
            args = []
 | 
						|
            arity = PL_functor_arity(f.value)
 | 
						|
            # let's have all args be consecutive
 | 
						|
            a0 = PL_new_term_refs(arity)
 | 
						|
            for i, a in enumerate(range(1, arity + 1)):
 | 
						|
                if PL_get_arg(a, term, a0 + i):
 | 
						|
                    args.append(getTerm(a0 + i))
 | 
						|
 | 
						|
            return cls(f.value, args=args, a0=a0)
 | 
						|
 | 
						|
    fromTerm = classmethod(fromTerm)
 | 
						|
 | 
						|
    value = property(lambda s: s.__value)
 | 
						|
 | 
						|
    def __call__(self, *args):
 | 
						|
        assert self.arity == len(args)
 | 
						|
        a = PL_new_term_refs(len(args))
 | 
						|
        for i, arg in enumerate(args):
 | 
						|
            putTerm(a + i, arg)
 | 
						|
 | 
						|
        t = PL_new_term_ref()
 | 
						|
        PL_cons_functor_v(t, self.handle, a)
 | 
						|
        return Term(t)
 | 
						|
 | 
						|
    def __str__(self):
 | 
						|
        if self.name is not None and self.arity is not None:
 | 
						|
            return "%s(%d)" % (self.name,self.arity)
 | 
						|
        else:
 | 
						|
            return self.__repr__()
 | 
						|
 | 
						|
    def __repr__(self):
 | 
						|
        return "".join(["Functor(", ",".join(str(x) for x in [self.handle,self.arity]+self.args), ")"])
 | 
						|
 | 
						|
def _unifier(arity, *args):
 | 
						|
    assert arity == 2
 | 
						|
    #if PL_is_variable(args[0]):
 | 
						|
    #    args[0].unify(args[1])
 | 
						|
    try:
 | 
						|
        return {args[0].chars:args[1].value}
 | 
						|
    except AttributeError:
 | 
						|
        return {args[0].chars:args[1]}
 | 
						|
 | 
						|
_unify = Functor("=", 2)
 | 
						|
Functor.func[_unify.handle] = _unifier
 | 
						|
_not = Functor("not", 1)
 | 
						|
_comma = Functor(",", 2)
 | 
						|
 | 
						|
def putTerm(term, value):
 | 
						|
    if isinstance(value, Term):
 | 
						|
        PL_put_term(term, value.handle)
 | 
						|
    elif isinstance(value, basestring):
 | 
						|
        PL_put_atom_chars(term, value)
 | 
						|
    elif isinstance(value, int):
 | 
						|
        PL_put_integer(term, value)
 | 
						|
    elif isinstance(value, Variable):
 | 
						|
        value.put(term)
 | 
						|
    elif isinstance(value, list):
 | 
						|
        putList(term, value)
 | 
						|
    elif isinstance(value, Atom):
 | 
						|
        print "ATOM"
 | 
						|
    elif isinstance(value, Functor):
 | 
						|
        PL_put_functor(term, value.handle)
 | 
						|
    else:
 | 
						|
        raise Exception("Not implemented")
 | 
						|
 | 
						|
def putList(l, ls):
 | 
						|
    PL_put_nil(l)
 | 
						|
    a = PL_new_term_ref()  #PL_new_term_refs(len(ls))
 | 
						|
    for item in reversed(ls):
 | 
						|
        putTerm(a, item)
 | 
						|
        PL_cons_list(l, a, l)
 | 
						|
    #PL_get_head(h, h)
 | 
						|
 | 
						|
# deprecated
 | 
						|
def getAtomChars(t):
 | 
						|
    """If t is an atom, return it as a string, otherwise raise InvalidTypeError.
 | 
						|
    """
 | 
						|
    s = c_char_p()
 | 
						|
    if PL_get_atom_chars(t, byref(s)):
 | 
						|
        return s.value
 | 
						|
    else:
 | 
						|
        raise InvalidTypeError("atom")
 | 
						|
 | 
						|
def getAtom(t):
 | 
						|
    """If t is an atom, return it , otherwise raise InvalidTypeError.
 | 
						|
    """
 | 
						|
    return Atom.fromTerm(t)
 | 
						|
 | 
						|
def getBool(t):
 | 
						|
    """If t is of type bool, return it, otherwise raise InvalidTypeError.
 | 
						|
    """
 | 
						|
    b = c_int()
 | 
						|
    if PL_get_int(t, byref(b)):
 | 
						|
        return bool(b.value)
 | 
						|
    else:
 | 
						|
        raise InvalidTypeError("bool")
 | 
						|
 | 
						|
def getLong(t):
 | 
						|
    """If t is of type long, return it, otherwise raise InvalidTypeError.
 | 
						|
    """
 | 
						|
    l = c_long()
 | 
						|
    if PL_get_long(t, byref(l)):
 | 
						|
        return l.value
 | 
						|
    else:
 | 
						|
        raise InvalidTypeError("long")
 | 
						|
 | 
						|
getInteger = getLong  # just an alias for getLong
 | 
						|
 | 
						|
def getFloat(t):
 | 
						|
    """If t is of type float, return it, otherwise raise InvalidTypeError.
 | 
						|
    """
 | 
						|
    d = c_double()
 | 
						|
    if PL_get_float(t, byref(d)):
 | 
						|
        return d.value
 | 
						|
    else:
 | 
						|
        raise InvalidTypeError("float")
 | 
						|
 | 
						|
def getString(t):
 | 
						|
    """If t is of type string, return it, otherwise raise InvalidTypeError.
 | 
						|
    """
 | 
						|
    slen = c_int()
 | 
						|
    s = c_char_p()
 | 
						|
    if PL_get_string_chars(t, byref(s), byref(slen)):
 | 
						|
        return s.value
 | 
						|
    else:
 | 
						|
        raise InvalidTypeError("string")
 | 
						|
 | 
						|
mappedTerms = {}
 | 
						|
def getTerm(t):
 | 
						|
    global mappedTerms
 | 
						|
    #print 'mappedTerms', mappedTerms
 | 
						|
 | 
						|
    #if t in mappedTerms:
 | 
						|
    #    return mappedTerms[t]
 | 
						|
    p = PL_term_type(t)
 | 
						|
    if p < PL_TERM:
 | 
						|
        res = _getterm_router[p](t)
 | 
						|
    elif PL_is_list(t):
 | 
						|
        res = getList(t)
 | 
						|
    else:
 | 
						|
        res = getFunctor(t)
 | 
						|
    mappedTerms[t] = res
 | 
						|
    return res
 | 
						|
 | 
						|
def getList(x):
 | 
						|
    """Return t as a list.
 | 
						|
    """
 | 
						|
    t = PL_copy_term_ref(x)
 | 
						|
    head = PL_new_term_ref()
 | 
						|
    result = []
 | 
						|
    while PL_get_list(t, head, t):
 | 
						|
        result.append(getTerm(head))
 | 
						|
 | 
						|
    return result
 | 
						|
 | 
						|
def getFunctor(t):
 | 
						|
    """Return t as a functor
 | 
						|
    """
 | 
						|
    return Functor.fromTerm(t)
 | 
						|
 | 
						|
def getVariable(t):
 | 
						|
    return Variable(t)
 | 
						|
 | 
						|
_getterm_router = {
 | 
						|
                    PL_VARIABLE:getVariable, PL_ATOM:getAtom, PL_STRING:getString,
 | 
						|
                    PL_INTEGER:getInteger, PL_FLOAT:getFloat,
 | 
						|
                    PL_TERM:getTerm
 | 
						|
                    }
 | 
						|
 | 
						|
arities = {}
 | 
						|
def _callbackWrapper(arity=1):
 | 
						|
    global arities
 | 
						|
 | 
						|
    res = arities.get(arity)
 | 
						|
    if res is None:
 | 
						|
        res = CFUNCTYPE(*([foreign_t] + [term_t]*arity))
 | 
						|
        arities[arity] = res
 | 
						|
    return res
 | 
						|
 | 
						|
funwraps = {}
 | 
						|
def _foreignWrapper(fun):
 | 
						|
    global funwraps
 | 
						|
 | 
						|
    res = funwraps.get(fun)
 | 
						|
    if res is None:
 | 
						|
        def wrapper(*args):
 | 
						|
            args = [getTerm(arg) for arg in args]
 | 
						|
            r = fun(*args)
 | 
						|
            return (r is None) and True or r
 | 
						|
        res = wrapper
 | 
						|
        funwraps[fun] = res
 | 
						|
    return res
 | 
						|
 | 
						|
cwraps = []
 | 
						|
 | 
						|
def registerForeign(func, name=None, arity=None, flags=0):
 | 
						|
    """Register a Python predicate
 | 
						|
    ``func``: Function to be registered. The function should return a value in
 | 
						|
    ``foreign_t``, ``True`` or ``False``.
 | 
						|
    ``name`` : Name of the function. If this value is not used, ``func.func_name``
 | 
						|
    should exist.
 | 
						|
    ``arity``: Arity (number of arguments) of the function. If this value is not
 | 
						|
    used, ``func.arity`` should exist.
 | 
						|
    """
 | 
						|
    global cwraps
 | 
						|
 | 
						|
    if arity is None:
 | 
						|
        arity = func.arity
 | 
						|
 | 
						|
    if name is None:
 | 
						|
        name = func.func_name
 | 
						|
 | 
						|
    cwrap = _callbackWrapper(arity)
 | 
						|
    fwrap = _foreignWrapper(func)
 | 
						|
    fwrap2 = cwrap(fwrap)
 | 
						|
    cwraps.append(fwrap2)
 | 
						|
    return PL_register_foreign(name, arity, cast(fwrap2, c_void_p), flags)
 | 
						|
    # return PL_register_foreign(name, arity,
 | 
						|
    #            _callbackWrapper(arity)(_foreignWrapper(func)), flags)
 | 
						|
 | 
						|
newTermRef = PL_new_term_ref
 | 
						|
 | 
						|
def newTermRefs(count):
 | 
						|
    a = PL_new_term_refs(count)
 | 
						|
    return range(a, a + count)
 | 
						|
 | 
						|
def call(*terms, **kwargs):
 | 
						|
    """Call term in module.
 | 
						|
    ``term``: a Term or term handle
 | 
						|
    """
 | 
						|
    for kwarg in kwargs:
 | 
						|
        if kwarg not in ["module"]:
 | 
						|
            raise KeyError
 | 
						|
 | 
						|
    module = kwargs.get("module", None)
 | 
						|
 | 
						|
    t = terms[0]
 | 
						|
    for tx in terms[1:]:
 | 
						|
        t = _comma(t, tx)
 | 
						|
 | 
						|
    return PL_call(t.handle, module)
 | 
						|
 | 
						|
def newModule(name):
 | 
						|
    """Create a new module.
 | 
						|
    ``name``: An Atom or a string
 | 
						|
    """
 | 
						|
    if isinstance(name, basestring):
 | 
						|
        name = Atom(name)
 | 
						|
 | 
						|
    return PL_new_module(name.handle)
 | 
						|
 | 
						|
class Query(object):
 | 
						|
    qid = None
 | 
						|
    fid = None
 | 
						|
 | 
						|
    def __init__(self, *terms, **kwargs):
 | 
						|
        # type: (object, object) -> object
 | 
						|
        for key in kwargs:
 | 
						|
            if key not in ["flags", "module"]:
 | 
						|
                raise Exception("Invalid kwarg: %s" % key, key)
 | 
						|
 | 
						|
        flags = kwargs.get("flags", PL_Q_NODEBUG|PL_Q_CATCH_EXCEPTION)
 | 
						|
        module = kwargs.get("module", None)
 | 
						|
 | 
						|
        t = terms[0]
 | 
						|
        for tx in terms[1:]:
 | 
						|
            t = _comma(t, tx)
 | 
						|
 | 
						|
        f = Functor.fromTerm(t)
 | 
						|
        p = PL_pred(f.handle, module)
 | 
						|
        Query.qid = PL_open_query(module, flags, p, f.a0)
 | 
						|
 | 
						|
#    def __del__(self):
 | 
						|
#        self.closeQuery()
 | 
						|
 | 
						|
    def nextSolution():
 | 
						|
        return PL_next_solution(Query.qid)
 | 
						|
 | 
						|
    nextSolution = staticmethod(nextSolution)
 | 
						|
 | 
						|
    def cutQuery():
 | 
						|
        PL_cut_query(Query.qid)
 | 
						|
 | 
						|
    cutQuery = staticmethod(cutQuery)
 | 
						|
 | 
						|
    def closeQuery():
 | 
						|
        if Query.qid is not None:
 | 
						|
            PL_close_query(Query.qid)
 | 
						|
            Query.qid = None
 | 
						|
 | 
						|
    closeQuery = staticmethod(closeQuery)
 | 
						|
 | 
						|
 | 
						|
def _test():
 | 
						|
    #from pyswip.prolog import Prolog
 | 
						|
    #p = Prolog()
 | 
						|
 | 
						|
    #p = _prolog
 | 
						|
 | 
						|
    assertz = Functor("assertz")
 | 
						|
    a = Functor("a_")
 | 
						|
    b = Functor("b_")
 | 
						|
 | 
						|
    call(assertz(a(10)))
 | 
						|
    call(assertz(a([1,2,3])))
 | 
						|
    call(assertz(a(11)))
 | 
						|
    call(assertz(b(11)))
 | 
						|
    call(assertz(b(12)))
 | 
						|
 | 
						|
    X = Variable()
 | 
						|
 | 
						|
    #q = Query(a(X), ~b(X))
 | 
						|
    #q = Query(b(X), a(X))
 | 
						|
    #while q.nextSolution():
 | 
						|
    #    print X.value
 | 
						|
    #print call(a(X),b(X))
 | 
						|
    #print call(_comma(~a(X),a(X)))
 | 
						|
    #q = Query(_comma(a(X), b(X)))
 | 
						|
    q = Query(a(X))
 | 
						|
    while q.nextSolution():
 | 
						|
        print ">", X.value
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    _test()
 | 
						|
 |