799 lines
22 KiB
C++
799 lines
22 KiB
C++
/*************************************************************************
|
|
* *
|
|
* YAP Prolog *
|
|
* *
|
|
* Yap Prolog was developed at NCCUP - Universidade do Porto *
|
|
* *
|
|
* Copyright L.Damas, V.S.Costa and Universidade do Porto 1985-1997 *
|
|
* *
|
|
**************************************************************************
|
|
* *
|
|
* File: myddas_postgres.c *
|
|
* Last rev: 22/03/05 *
|
|
* mods: *
|
|
* comments: Predicates for comunicating with a pq database system *
|
|
* *
|
|
*************************************************************************/
|
|
|
|
#ifdef MYDDAS_POSTGRES
|
|
|
|
#include <libpq-fe.h>
|
|
|
|
#include "Yap.h"
|
|
#include "Yatom.h"
|
|
#include "YapText.h"
|
|
#include "cut_c.h"
|
|
#include "eval.h"
|
|
#include "myddas.h"
|
|
#ifdef MYDDAS_STATS
|
|
#include "myddas_structs.h"
|
|
#include "myddas_statistics.h"
|
|
#endif
|
|
#include "myddas_wkb2prolog.h"
|
|
|
|
#define CALL_POSTGRES(f) \
|
|
{ \
|
|
res = PQ ## f; \
|
|
if (PQstatus(conn) != CONNECTION_OK) {} \
|
|
fprintf(stderr, "Error %s when calling %s, line %d\n", \
|
|
PQerrorMessage(conn), #f,__LINE__); \
|
|
exit (1); \
|
|
} \
|
|
PQclear(res); \
|
|
} \
|
|
|
|
#define GET_POSTGRES(f,x) \
|
|
{ \
|
|
int i; \
|
|
i = sqlite3_ ## f; \
|
|
if (i != SQLITE_ ## x) { \
|
|
fprintf (stderr, "%s failed with status %d: %s\n", \
|
|
#f, i, sqlite3_errmsg (db)); \
|
|
exit (1); \
|
|
} \
|
|
} \
|
|
|
|
|
|
|
|
static Int NULL_id = 0;
|
|
|
|
typedef struct result_set {
|
|
PGconn *db;
|
|
const PGresult *res;
|
|
int i ;
|
|
int nrows;
|
|
int ncols;
|
|
} resultSet;
|
|
|
|
void Yap_InitMYDDAS_PGPreds();
|
|
void Yap_InitBackMYDDAS_PGPreds();
|
|
|
|
|
|
static Int c_PG_connect( USES_REGS1 );
|
|
static Int c_PG_disconnect( USES_REGS1 );
|
|
static Int c_PG_number_of_fields( USES_REGS1 );
|
|
static Int c_PG_get_attributes_types( USES_REGS1 );
|
|
static Int c_PG_query( USES_REGS1 );
|
|
static Int c_PG_table_write( USES_REGS1 );
|
|
static Int c_PG_row( USES_REGS1 );
|
|
static Int c_PG_row_cut( USES_REGS1 );
|
|
static Int c_PG_get_fields_properties( USES_REGS1 );
|
|
static Int c_PG_get_next_result_set( USES_REGS1 );
|
|
static Int c_PG_get_database( USES_REGS1 );
|
|
static Int c_PG_change_database( USES_REGS1 );
|
|
|
|
void Yap_InitMYDDAS_PGPreds(void)
|
|
{
|
|
/* db_dbect: Host x User x Passwd x Database x dbection x ERROR_CODE */
|
|
Yap_InitCPred("c_PG_connect", 7, c_PG_connect, 0);
|
|
|
|
/* db_number_of_fields: Relation x connection x NumberOfFields */
|
|
Yap_InitCPred("c_PG_number_of_fields",3, c_PG_number_of_fields, 0);
|
|
|
|
/* db_get_attributes_types: Relation x TypesList */
|
|
Yap_InitCPred("c_PG_get_attributes_types", 3, c_PG_get_attributes_types, 0);
|
|
|
|
/* db_query: SQLQuery x ResultSet x conection */
|
|
Yap_InitCPred("c_PG_query", 5, c_PG_query, 0);
|
|
|
|
/* db_disconnect: connection */
|
|
Yap_InitCPred("c_PG_disconnect", 1,c_PG_disconnect, 0);
|
|
|
|
/* db_table_write: Result Set */
|
|
Yap_InitCPred("c_PG_table_write", 1, c_PG_table_write, 0);
|
|
|
|
/* db_get_fields_properties: PredName x connection x PropertiesList*/
|
|
Yap_InitCPred("c_PG_get_fields_properties",3,c_PG_get_fields_properties,0);
|
|
|
|
Yap_InitCPred("c_PG_get_next_result_set",2,c_PG_get_next_result_set,0);
|
|
|
|
/* c_PG_get_database: connection x DataBaseName */
|
|
Yap_InitCPred("c_PG_get_database",2,c_PG_get_database,0);
|
|
|
|
/* c_PG_change_database: connection x DataBaseName */
|
|
Yap_InitCPred("c_PG_change_database",2,c_PG_change_database,0);
|
|
|
|
|
|
}
|
|
|
|
void Yap_InitBackMYDDAS_PGPreds(void)
|
|
{
|
|
/* db_row: ResultSet x Arity x ListOfArgs */
|
|
Yap_InitCPredBackCut("c_PG_row", 3, sizeof(Int),
|
|
c_PG_row,
|
|
c_PG_row,
|
|
c_PG_row_cut, 0);
|
|
|
|
}
|
|
|
|
static Int
|
|
c_PG_connect( USES_REGS1 ) {
|
|
int i=0;
|
|
Term arg_host = Deref(ARG1);
|
|
Term arg_user = Deref(ARG2);
|
|
Term arg_passwd = Deref(ARG3);
|
|
Term arg_database = Deref(ARG4);
|
|
Term arg_port = Deref(ARG5);
|
|
Term arg_socket = Deref(ARG6);
|
|
Term arg_conn = ARG7;
|
|
Term tnone = MkAtomTerm(Yap_LookupAtom(""));
|
|
Term tzero = MkIntTerm(0);
|
|
|
|
char *keywords[8], *values[8], ports[16];
|
|
|
|
|
|
if (IsApplTerm(arg_host)) {
|
|
keywords[i] = "hostaddr";
|
|
values[i++] = RepAtom(AtomOfTerm(ArgOfTerm(1, arg_host)))->StrOfAE;
|
|
} else {
|
|
keywords[i] = "host";
|
|
values[i++] = RepAtom(AtomOfTerm(arg_host))->StrOfAE;
|
|
}
|
|
if (IsNonVarTerm(arg_user) && arg_user != tnone && IsAtomTerm(arg_user)) {
|
|
keywords[i] = "user";
|
|
values[i++] = RepAtom(AtomOfTerm(arg_user))->StrOfAE;
|
|
}
|
|
if (IsNonVarTerm(arg_user) && arg_passwd != tnone && IsAtomTerm(arg_passwd)) {
|
|
keywords[i] = "password";
|
|
values[i++] = RepAtom(AtomOfTerm(arg_passwd))->StrOfAE;
|
|
}
|
|
if (IsNonVarTerm(arg_user) && arg_database != tnone && IsAtomTerm(arg_database)) {
|
|
keywords[i] = "dbase";
|
|
values[i++] = RepAtom(AtomOfTerm(arg_database))->StrOfAE;
|
|
}
|
|
if (IsNonVarTerm(arg_user) && arg_port != tzero && IsIntTerm(arg_port)) {
|
|
keywords[i] = "port";
|
|
values[i++] = snprintf(ports, sizeof(ports)-1, "%d", IntOfTerm(arg_port));
|
|
} else if (IsNonVarTerm(arg_socket) && arg_socket != tempty && IsIntTerm(arg_socket)) {
|
|
keywords[i] = "port";
|
|
values[i++] = RepAtom(AtomOfTerm(arg_database))->StrOfAE;
|
|
}
|
|
keywords[i] = NULL;
|
|
values[i++] = NULL;
|
|
|
|
/* Make a connection to the database */
|
|
conn = PQconnectdbParams(keywords, values);
|
|
|
|
/* Check to see that the backend c onnection was successfully made */
|
|
if (PQstatus(conn) != CONNECTION_OK)
|
|
{
|
|
fprintf(stderr, "Connection to database failed: %s",
|
|
PQerrorMessage(conn));
|
|
return FALSE;
|
|
}
|
|
|
|
if (!Yap_unify(arg_conn, MkAddressTerm(conn)))
|
|
return FALSE;
|
|
else
|
|
{
|
|
/* Criar um novo no na lista de ligacoes*/
|
|
new = myddas_util_add_connection(conn,NULL,MYDDAS_API);
|
|
|
|
if (new == NULL){
|
|
#ifdef DEBUG
|
|
printf("ERROR: ** c_db_my_connect ** Error allocating memory\n");
|
|
#endif
|
|
return FALSE;
|
|
}
|
|
return TRUE;
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
static MYDDAS_STATS_TIME
|
|
myddas_stat_init_query( PG *db )
|
|
{
|
|
#ifdef MYDDAS_STATS
|
|
MYDDAS_UTIL_connecTION node = myddas_util_search_connection(db);
|
|
MyddasULInt count = 0;
|
|
|
|
/* Count the number of querys made to the server */
|
|
MyddasULInt number_querys;
|
|
MYDDAS_STATS_CON_GET_NUMBER_QUERIES_MADE(node,number_querys);
|
|
MYDDAS_STATS_CON_SET_NUMBER_QUERIES_MADE(node,++number_querys);
|
|
MYDDAS_STATS_CON_GET_NUMBER_QUERIES_MADE_COUNT(node,count);
|
|
MYDDAS_STATS_CON_SET_NUMBER_QUERIES_MADE_COUNT(node,++count);
|
|
/* Measure time spent by the PG Server
|
|
processing the SQL Query */
|
|
return myddas_stats_walltime();
|
|
#else
|
|
return NULL;
|
|
#endif
|
|
}
|
|
|
|
|
|
static MYDDAS_STATS_TIME
|
|
myddas_stat_end_query( MYDDAS_STATS_TIME start )
|
|
{
|
|
MYDDAS_STATS_TIME diff = NULL;
|
|
#ifdef MYDDAS_STATS
|
|
/* Measure time spent by the PG Server
|
|
processing the SQL Query */
|
|
end = myddas_stats_walltime();
|
|
|
|
MYDDAS_STATS_INITIALIZE_TIME_STRUCT(diff,time_copy);
|
|
myddas_stats_subtract_time(diff,end,start);
|
|
diff = myddas_stats_time_copy_to_final(diff);
|
|
|
|
MYDDAS_FREE(end,struct myddas_stats_time_struct);
|
|
MYDDAS_FREE(start,struct myddas_stats_time_struct);
|
|
|
|
MYDDAS_STATS_CON_GET_TOTAL_TIME_DBSERVER(node,total_time);
|
|
/* Automacally updates the MYDDAS_STRUCTURE */
|
|
myddas_stats_add_time(total_time,diff,total_time);
|
|
MYDDAS_STATS_CON_GET_TOTAL_TIME_DBSERVER_COUNT(node,count);
|
|
MYDDAS_STATS_CON_SET_TOTAL_TIME_DBSERVER_COUNT(node,++count);
|
|
|
|
MYDDAS_STATS_TIME time = NULL;
|
|
MYDDAS_STATS_CON_GET_LAST_TIME_DBSERVER(node,time);
|
|
myddas_stats_move_time(diff,time);
|
|
MYDDAS_STATS_CON_GET_LAST_TIME_DBSERVER_COUNT(node,count);
|
|
MYDDAS_STATS_CON_SET_LAST_TIME_DBSERVER_COUNT(node,++count);
|
|
#endif
|
|
return diff;
|
|
}
|
|
|
|
#ifdef MYDDAS_STATS
|
|
/* measure transfer time */
|
|
static void
|
|
myddas_stat_transfer_query( MYDDAS_STATS_TIME diff )
|
|
{
|
|
/* Measure time spent by the PG Server
|
|
transferring the result of the last query
|
|
back to the client */
|
|
start = myddas_stats_walltime();
|
|
/* Measure time spent by the PG Server
|
|
transferring the result of the last query
|
|
back to the client */
|
|
end = myddas_stats_walltime();
|
|
|
|
MYDDAS_STATS_INITIALIZE_TIME_STRUCT(diff,time_copy);
|
|
myddas_stats_subtract_time(diff,end,start);
|
|
diff = MYDDAS_STATS_TIME_copy_to_final(diff);
|
|
|
|
MYDDAS_FREE(end,struct myddas_stats_time_struct);
|
|
MYDDAS_FREE(start,struct myddas_stats_time_struct);
|
|
|
|
MYDDAS_STATS_CON_GET_TOTAL_TIME_TRANSFERING(node,total_time);
|
|
/* Automacally updates the MYDDAS_STRUCTURE */
|
|
myddas_stats_add_time(total_time,diff,total_time);
|
|
MYDDAS_STATS_CON_GET_TOTAL_TIME_TRANSFERING_COUNT(node,count);
|
|
MYDDAS_STATS_CON_SET_TOTAL_TIME_TRANSFERING_COUNT(node,++count);
|
|
|
|
time = NULL;
|
|
MYDDAS_STATS_CON_GET_LAST_TIME_TRANSFERING(node,time);
|
|
MYDDAS_STATS_CON_GET_LAST_TIME_TRANSFERING_COUNT(node,count);
|
|
MYDDAS_STATS_CON_SET_LAST_TIME_TRANSFERING_COUNT(node,++count);
|
|
myddas_stats_move_time(diff,time);
|
|
|
|
/* Measure the number of Rows returned from the server */
|
|
if (res_set != NULL)
|
|
{
|
|
/* With an INSERT statement, PG_(use or store)_result()
|
|
returns a NULL pointer*/
|
|
|
|
/* This is only works if we use PG_store_result */
|
|
MyddasUInt numberRows = PG_num_rows(res_set);
|
|
MyddasUInt rows;
|
|
myddas_stat_transfer_query( diff );
|
|
|
|
MYDDAS_STATS_CON_GET_TOTAL_ROWS(node,rows);
|
|
numberRows = numberRows + rows;
|
|
MYDDAS_STATS_CON_SET_TOTAL_ROWS(node,numberRows);
|
|
MYDDAS_STATS_CON_GET_TOTAL_ROWS_COUNT(node,count);
|
|
MYDDAS_STATS_CON_SET_TOTAL_ROWS_COUNT(node,++count);
|
|
|
|
/* Calculate the ammount of data sent by the server */
|
|
MyddasUInt total,number_fields = PG_num_fields(res_set);
|
|
PG_ROW row;
|
|
MyddasULInt i;
|
|
total=0;
|
|
while ((row = PG_fetch_row(res_set)) != NULL){
|
|
PG_field_seek(res_set,0);
|
|
|
|
for(i=0;i<number_fields;i++){
|
|
if (row[i] != NULL)
|
|
total = total + strlen(row[i]);
|
|
}
|
|
}
|
|
MYDDAS_STATS_CON_SET_LAST_BYTES_TRANSFERING_FROM_DBSERVER(node,total);
|
|
MYDDAS_STATS_CON_GET_LAST_BYTES_TRANSFERING_FROM_DBSERVER_COUNT(node,count);
|
|
MYDDAS_STATS_CON_SET_LAST_BYTES_TRANSFERING_FROM_DBSERVER_COUNT(node,++count);
|
|
|
|
MyddasUInt bytes = 0;
|
|
MYDDAS_STATS_CON_GET_TOTAL_BYTES_TRANSFERING_FROM_DBSERVER(node,bytes);
|
|
total = total + bytes;
|
|
MYDDAS_STATS_CON_SET_TOTAL_BYTES_TRANSFERING_FROM_DBSERVER(node,total);
|
|
MYDDAS_STATS_CON_GET_TOTAL_BYTES_TRANSFERING_FROM_DBSERVER_COUNT(node,count);
|
|
MYDDAS_STATS_CON_SET_TOTAL_BYTES_TRANSFERING_FROM_DBSERVER_COUNT(node,++count);
|
|
}
|
|
}
|
|
#endif
|
|
|
|
/* db_query: SQLQuery x ResultSet x connection */
|
|
static Int
|
|
c_PQ_query( USES_REGS1 ) {
|
|
Term arg_sql_query = Deref(ARG1);
|
|
Term arg_result_set = Deref(ARG2);
|
|
Term arg_db = Deref(ARG3);
|
|
Term arg_mode = Deref(ARG4);
|
|
Term arg_arity = ARG5;
|
|
|
|
char *sql = AtomName(AtomOfTerm(arg_sql_query));
|
|
char *mode = AtomName(AtomOfTerm(arg_mode));
|
|
PGresult *res;
|
|
PGconn *db = AddressOfTerm(arg_db);
|
|
const char *stmt;
|
|
|
|
MYDDAS_STATS_TIME start, end;
|
|
int length=strlen(sql);
|
|
struct result_set *rs = malloc(sizeof( struct result_set));
|
|
if (!rs)
|
|
return FALSE;
|
|
rs->db = db;
|
|
rs->i = 0;
|
|
|
|
start = myddas_stat_init_query( db );
|
|
|
|
char *stmt = new_stmt();
|
|
|
|
/* Send query to server and process it */
|
|
if (strcmp(mode,"store_result")!=0) {
|
|
// Leave data for extraction
|
|
CALL_POSTGRES (prepare(db, stmt, sql, 0, NULL) );
|
|
res = GET_POSTGRES (PQdescribePrepared(db, stmt) );
|
|
rs->stmt = stmt;
|
|
rs->res_set = NULL;
|
|
rs->nrows = -1;
|
|
rs->length = PQnfields(res);
|
|
if (!Yap_unify(arg_arity, MkIntegerTerm(rs->length)))
|
|
{
|
|
free(rs);
|
|
return FALSE;
|
|
}
|
|
if (!Yap_unify(arg_result_set, MkAddressTerm( rs)))
|
|
{
|
|
free(rs);
|
|
return FALSE;
|
|
}
|
|
return TRUE;
|
|
} else{
|
|
// construct an intermediate table, res_set
|
|
char **res_set;
|
|
char *msg;
|
|
int nrows;
|
|
|
|
res = GET_POSTGRES (PGexec(db, sql) );
|
|
|
|
end = myddas_stat_end_query( start );
|
|
length = PGnfields(res);
|
|
nrows = PGntuples(res);
|
|
//INSERT statements don't return any res_set
|
|
if (nrows == 0) {
|
|
return TRUE;
|
|
}
|
|
if (!Yap_unify(arg_arity, MkIntegerTerm(nrows))){
|
|
/*
|
|
* Should PGclear PGresult whenever it is no longer needed to avoid memory
|
|
* leaks
|
|
*/
|
|
free(rs);
|
|
PGclear(res);
|
|
return FALSE;
|
|
}
|
|
rs->stmt = NULL;
|
|
rs->res_set = res_set;
|
|
rs->nrows = nrows;
|
|
rs->length = length;
|
|
if (!Yap_unify(arg_result_set, MkAddressTerm( rs)))
|
|
{
|
|
free(rs);
|
|
PGclear(res);
|
|
return FALSE;
|
|
}
|
|
}
|
|
return TRUE;
|
|
}
|
|
|
|
|
|
static Int
|
|
c_PG_number_of_fields( USES_REGS1 ) {
|
|
Term arg_relation = Deref(ARG1);
|
|
Term arg_db = Deref(ARG2);
|
|
Term arg_fields = ARG3;
|
|
|
|
char *relation = AtomName(AtomOfTerm(arg_relation));
|
|
PG *db = AddressOfTerm(arg_db);
|
|
PG_stmt *stmt;
|
|
|
|
char sql[256];
|
|
|
|
snprintf(sql, 255, "SELECT * FROM TABLE `%s`",relation);
|
|
// Leave data for extraction
|
|
CALL_POSTGRES (prepare(db, stmt, sql, 0, NULL) );
|
|
|
|
/* executar a query SQL */
|
|
|
|
int nrows = GET_POSTGRES (PGntuples(stmt) );
|
|
|
|
PGClear( stmt );
|
|
|
|
return Yap_unify(arg_fields, MkIntegerTerm( nrows ));
|
|
}
|
|
|
|
|
|
/* db_get_attributes_types: RelName x connection -> TypesList */
|
|
static Int
|
|
c_PG_get_attributes_types( USES_REGS1 ) {
|
|
Term arg_relation = Deref(ARG1);
|
|
Term arg_db = Deref(ARG2);
|
|
Term arg_types_list = Deref(ARG3);
|
|
Term list, head;
|
|
|
|
char *relation = AtomName(AtomOfTerm(arg_relation));
|
|
PG *db = AddressOfTerm(arg_db);
|
|
char sql[256];
|
|
int row;
|
|
|
|
PG_stmt *stmt;
|
|
Int rc = TRUE;
|
|
|
|
sprintf(sql,"SELECT * FROM TABLE `%s`",relation);
|
|
// Leave data for extraction
|
|
res = GET_POSTGRES (PGprepare(db, stmt, sql, 0, NULL) );
|
|
|
|
/* executar a query SQL */
|
|
|
|
|
|
int cols = PGnfields( res );
|
|
|
|
list = arg_types_list;
|
|
|
|
for (col = 0; col < cols; col++)
|
|
{
|
|
const char *tm;
|
|
|
|
|
|
head = HeadOfTerm(list);
|
|
rc = (
|
|
rc && Yap_unify(head, MkAtomTerm(Yap_LookupAtom(PGfname(res, col))) ) );
|
|
list = TailOfTerm(list);
|
|
head = HeadOfTerm(list);
|
|
list = TailOfTerm(list);
|
|
|
|
Oid type = PGftype(res, col);
|
|
switch(type) {
|
|
case BYTEOID:
|
|
case CHAROID:
|
|
case INT2OID:
|
|
case INT4OID:
|
|
case INT8OID:
|
|
tm = "integer";
|
|
break;
|
|
case FLOAT4OID:
|
|
case FLOAT8OID:
|
|
tm = "real";
|
|
break;
|
|
case NAMEOID:
|
|
tm = "atom";
|
|
break;
|
|
case TEXTOID:
|
|
tm = "string";
|
|
break;
|
|
default:
|
|
tm = "unknown type";
|
|
break;
|
|
}
|
|
if (!Yap_unify(head, MkAtomTerm(Yap_LookupAtom(tm))) )
|
|
rc = FALSE;
|
|
}
|
|
|
|
PGClear( stmt );
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
/* db_disconnect */
|
|
static Int
|
|
c_PG_disconnect( USES_REGS1 ) {
|
|
Term arg_db = Deref(ARG1);
|
|
|
|
PG *db = AddressOfTerm(arg_db);
|
|
|
|
if ((myddas_util_search_connection(db)) != NULL)
|
|
{
|
|
myddas_util_delete_connection(db);
|
|
PGfinish(db);
|
|
return TRUE;
|
|
}
|
|
else
|
|
{
|
|
return FALSE;
|
|
}
|
|
}
|
|
|
|
/* db_table_write: Result Set */
|
|
static Int
|
|
c_PG_table_write( USES_REGS1 ) {
|
|
/*
|
|
Term arg_res_set = Deref(ARG1);
|
|
PG_RES *res_set = (PG_RES *) IntegerOfTerm(arg_res_set);
|
|
|
|
mydas_util_table_write(res_set);
|
|
PG_free_result(res_set);
|
|
*/
|
|
return TRUE;
|
|
}
|
|
|
|
static Int
|
|
c_PG_get_fields_properties( USES_REGS1 ) {
|
|
Term nome_relacao = Deref(ARG1);
|
|
Term arg_db = Deref(ARG2);
|
|
Term fields_properties_list = Deref(ARG3);
|
|
Term head, list;
|
|
|
|
char *relation = AtomName(AtomOfTerm(nome_relacao));
|
|
char sql[256];
|
|
Int num_fields,i;
|
|
PGconn *db = (PG *) (IntegerOfTerm(arg_db));
|
|
|
|
sprintf(sql,"\\d+ `%s`",relation);
|
|
|
|
res = GET_POSTGRES (PGexec(db, sql) );
|
|
|
|
end = myddas_stat_end_query( start );
|
|
Functor functor = Yap_MkFunctor(Yap_LookupAtom("property"),4);
|
|
|
|
Term properties[4];
|
|
|
|
list = fields_properties_list;
|
|
|
|
num_fields = PGntuples(res);
|
|
|
|
if (( coln = PGfnumber(res,"Column")) < 0)
|
|
return FALSE;
|
|
if (( propname = PGfname(res,2)) == NULL)
|
|
return FALSE;
|
|
if (!strstr(propname, "Table") || !strstr(propname, "Modifiers") )
|
|
return FALSE;
|
|
|
|
for (i=0;i<num_fields;i++)
|
|
{
|
|
bool not_null, prim, auto_inc;
|
|
|
|
head = HeadOfTerm(list);
|
|
|
|
if (!PGgetisnull(res, i, coln)) {
|
|
const char *col = PGgetvalue(res, i, coln);
|
|
properties[0] = MkAtomTerm(Yap_LookupAtom(col));
|
|
}
|
|
if (!PGgetisnull(res, i, 2)) {
|
|
const char *props = PGgetvalue(res, i, 2);
|
|
properties[1] = MkIntegerTerm(strstr(props, "not null") != NULL); //Can't be NULL
|
|
properties[2] = MkIntegerTerm(strstr(props, "prim") != NULL); //Can't be NULL
|
|
properties[3] = MkIntegerTerm(strstr(props, "nextval") != NULL); //Can't be NULL
|
|
}
|
|
list = TailOfTerm(list);
|
|
if (!Yap_unify(head, Yap_MkApplTerm(functor,4,properties))){
|
|
return FALSE;
|
|
}
|
|
}
|
|
|
|
PGclear(res);
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
/* c_PG_get_next_result_set: connection * NextResSet */
|
|
static Int
|
|
c_PG_get_next_result_set( USES_REGS1 ) {
|
|
Term arg_db = Deref(ARG1);
|
|
Term arg_next_res_set = Deref(ARG2);
|
|
|
|
PGconn *db = AddressOfTerm(arg_db);
|
|
|
|
return FALSE;
|
|
}
|
|
|
|
static Int
|
|
c_PG_get_database( USES_REGS1 ) {
|
|
Term arg_con = Deref(ARG1);
|
|
Term arg_database = Deref(ARG2);
|
|
|
|
if (!Yap_unify(arg_database,arg_con))
|
|
return FALSE;
|
|
|
|
return TRUE;
|
|
|
|
}
|
|
|
|
static Int
|
|
c_PG_change_database( USES_REGS1 ) {
|
|
/* no-op for now */
|
|
return TRUE;
|
|
}
|
|
|
|
static Int
|
|
c_PG_row_cut( USES_REGS1 ) {
|
|
struct result_set *res_set=NULL;
|
|
PG *db = res_set->db;
|
|
|
|
res_set = AddressOfTerm(EXTRA_CBACK_CUT_ARG(Term,1));
|
|
PQclear( res_set->res );
|
|
PQfinish( db );
|
|
free(res_set);
|
|
return TRUE;
|
|
}
|
|
|
|
#define cvt( s ) cvt__( s PASS_REGS )
|
|
|
|
static Term
|
|
cvt__(const char *s USES_REGS) {
|
|
return Yap_CharsToTDQ( s, CurrentModule PASS_REGS);
|
|
}
|
|
|
|
/* db_row: ResultSet x Arity_ListOfArgs x ListOfArgs -> */
|
|
static Int
|
|
c_PG_row( USES_REGS1 ) {
|
|
#ifdef MYDDAS_STATS
|
|
/* Measure time used by the */
|
|
/* c_PG_row function */
|
|
//MYDDAS_STATS_TIME start,end,total_time,diff;
|
|
MyddasULInt count = 0;
|
|
start = myddas_stats_walltime();
|
|
#endif
|
|
Term arg_result_set = Deref(ARG1);
|
|
Term arg_arity = Deref(ARG2);
|
|
Term arg_list_args = Deref(ARG3);
|
|
Int rc = TRUE;
|
|
|
|
if (IsVarTerm( arg_result_set ) ) {
|
|
if (!c_PG_query( PASS_REGS1 ) ) {
|
|
cut_fail();
|
|
}
|
|
arg_result_set = Deref(ARG1);
|
|
EXTRA_CBACK_ARG(3,1)= arg_result_set ;
|
|
EXTRA_CBACK_ARG(3,2)= MkIntegerTerm(0) ;
|
|
}
|
|
struct result_set *res_set = AddressOfTerm(arg_result_set);
|
|
|
|
Term head, list, NULL_atom[1];
|
|
Int arity;
|
|
list = arg_list_args;
|
|
arity = IntegerOfTerm(arg_arity);
|
|
PG *db = res_set->db;
|
|
if (res_set->stmt == NULL ) {
|
|
CACHE_REGS
|
|
Int i= IntegerOfTerm(EXTRA_CBACK_CUT_ARG(Term,2)), j;
|
|
Int rc = true;
|
|
// data needs to be copied to Prolog
|
|
// row by row
|
|
#ifdef MYDDAS_STATS
|
|
MYDDAS_STATS_TIME diff;
|
|
|
|
MYDDAS_STATS_INITIALIZE_TIME_STRUCT(diff,time_copy);
|
|
#endif
|
|
for (j = 0; j < arity; j++)
|
|
{
|
|
/* Ts -> List */
|
|
const char *field = res_set->res_set[indx++];
|
|
head = HeadOfTerm(list);
|
|
list = TailOfTerm(list);
|
|
rc = (rc && Yap_unify(head, cvt( field )) );
|
|
}
|
|
if (rc) {
|
|
EXTRA_CBACK_ARG(3,2)= MkIntegerTerm(i+1);
|
|
return rc;
|
|
}
|
|
#ifdef MYDDAS_STATS
|
|
myddas_stat_transfer_query( diff );
|
|
#endif
|
|
cut_fail();
|
|
}
|
|
// busy-waiting
|
|
if (!PQsetnonblocking(res_set->db, 1)) {
|
|
fprintf(stderr, "Connection to database failed: %s",
|
|
PQerrorMessage(conn));
|
|
return FALSE;
|
|
|
|
}
|
|
if (!PQsendQueryPrepared(res_set->db, res_set->stmt, 0, NULL, NULL, NULL, 0)) {
|
|
fprintf(stderr, "Connection to database failed: %s",
|
|
PQerrorMessage(conn));
|
|
return FALSE;
|
|
|
|
}
|
|
int res;
|
|
while((res = PQgetResult(res_set->db)) != NULL);
|
|
if (i == res_set->nrows) {
|
|
// no more data
|
|
PQfinish( res_set->db ) );
|
|
free(res_set);
|
|
#ifdef MYDDAS_STATS
|
|
end = myddas_stats_walltime();
|
|
|
|
MYDDAS_STATS_INITIALIZE_TIME_STRUCT(diff,time_copy);
|
|
myddas_stats_subtract_time(diff,end,start);
|
|
diff = myddas_stats_time_copy_to_final(diff);
|
|
|
|
MYDDAS_FREE(end,struct myddas_stats_time_struct);
|
|
MYDDAS_FREE(start,struct myddas_stats_time_struct);
|
|
|
|
MYDDAS_STATS_GET_DB_ROW_FUNCTION(total_time);
|
|
myddas_stats_add_time(total_time,diff,total_time);
|
|
MYDDAS_STATS_GET_DB_ROW_FUNCTION_COUNT(count);
|
|
MYDDAS_STATS_SET_DB_ROW_FUNCTION_COUNT(++count);
|
|
|
|
MYDDAS_FREE(diff,struct myddas_stats_time_struct);
|
|
#endif /* MYDDAS_STATS */
|
|
cut_fail(); /* This macro already does a return FALSE */
|
|
|
|
} else if (i < res_ser->nrows) {
|
|
list = arg_list_args;
|
|
Term tf;
|
|
|
|
for (j = 0; j < arity; j++)
|
|
{
|
|
/* convert data types here */
|
|
head = HeadOfTerm(list);
|
|
list = TailOfTerm(list);
|
|
|
|
if (!PGgetisnull(res, i, j)) {
|
|
const char *col = PGgetvalue(res, i, j);
|
|
properties[0] = MkAtomTerm(Yap_LookupAtom(col));
|
|
} else {
|
|
NULL_atom[0] = MkIntegerTerm(NULL_id++);
|
|
tf = Yap_MkApplTerm(Yap_MkFunctor(Yap_LookupAtom("NULL"),1),1,NULL_atom);
|
|
}
|
|
}
|
|
if (!Yap_unify(head, tf))
|
|
rc = FALSE;
|
|
#ifdef MYDDAS_STATS
|
|
end = myddas_stats_walltime();
|
|
|
|
myddas_stats_subtract_time(diff,end,start);
|
|
diff = myddas_stats_time_copy_to_final(diff);
|
|
|
|
MYDDAS_FREE(end,struct myddas_stats_time_struct);
|
|
MYDDAS_FREE(start,struct myddas_stats_time_struct);
|
|
|
|
MYDDAS_STATS_GET_DB_ROW_FUNCTION(total_time);
|
|
myddas_stats_add_time(total_time,diff,total_time);
|
|
MYDDAS_STATS_GET_DB_ROW_FUNCTION_COUNT(count);
|
|
MYDDAS_STATS_SET_DB_ROW_FUNCTION_COUNT(++count);
|
|
|
|
MYDDAS_FREE(diff,struct myddas_stats_time_struct);
|
|
#endif /* MYDDAS_STATS */
|
|
} else
|
|
{
|
|
Yap_Error(SYSTEM_ERROR_INTERNAL, TermNil, "PG: %s", PG_errmsg( db ) );
|
|
}
|
|
return rc;
|
|
}
|
|
|
|
|
|
#endif /* MYDDAS_PG */
|