This repository has been archived on 2023-08-20. You can view files and clone it, but cannot push or open issues or pull requests.
yap-6.3/packages/myddas/myddas_postgres.c
Vítor Santos Costa 63c2fa3c6b warning, postgres
2015-06-19 01:19:48 +01:00

891 lines
25 KiB
C

/*************************************************************************
* *
* YAP Prolog *
* *
* Yap Prolog was developed at NCCUP - Universidade do Porto *
* *
* Copyright L.Damas, V.S.Costa and Universidade do Porto 1985-1997 *
* *
**************************************************************************
* *
* File: myddas_postgres.c *
* Last rev: 22/03/05 *
* mods: *
* comments: Predicates for comunicating with a pq database system *
* *
*************************************************************************/
#ifdef MYDDAS_POSTGRES
#include <libpq-fe.h>
#define BOOLOID 16
#define BYTEOID 17
#define CHAROID 18
#define NAMEOID 19
#define INT8OID 20
#define INT2OID 21
#define INT2VECTOROID 22
#define INT4OID 23
#define REGPROCOID 24
#define TEXTOID 25
#define OIDOID 26
#define TIDOID 27
#define XIDOID 28
#define CIDOID 29
#define OIDVECTOROID 30
#define JSONOID 114
#define XMLOID 142
#define PGNODETREEOID 194
#define POINTOID 600
#define LSEGOID 601
#define PATHOID 602
#define BOXOID 603
#define POLYGONOID 604
#define LINEOID 628
#define FLOAT4OID 700
#define FLOAT8OID 701
#define ABSTIMEOID 702
#define RELTIMEOID 703
#define TINTERVALOID 704
#define UNKNOWNOID 705
#define CIRCLEOID 718
#define CASHOID 790
#define MACADDROID 829
#define INETOID 869
#define CIDROID 650
#define INT2ARRAYOID 1005
#define INT4ARRAYOID 1007
#define TEXTARRAYOID 1009
#define OIDARRAYOID 1028
#define FLOAT4ARRAYOID 1021
#define ACLITEMOID 1033
#define CSTRINGARRAYOID 1263
#define BPCHAROID 1042
#define VARCHAROID 1043
#define DATEOID 1082
#define TIMEOID 1083
#define TIMESTAMPOID 1114
#define TIMESTAMPTZOID 1184
#define INTERVALOID 1186
#define TIMETZOID 1266
#define BITOID 1560
#define VARBITOID 1562
#define NUMERICOID 1700
#define REFCURSOROID 1790
#define REGPROCEDUREOID 2202
#define REGOPEROID 2203
#define REGOPERATOROID 2204
#define REGCLASSOID 2205
#define REGTYPEOID 2206
#define REGTYPEARRAYOID 2211
#define UUIDOID 2950
#define LSNOID 3220
#define TSVECTOROID 3614
#define GTSVECTOROID 3642
#define TSQUERYOID 3615
#define REGCONFIGOID 3734
#define REGDICTIONARYOID 3769
#define JSONBOID 3802
#define INT4RANGEOID 3904
#define RECORDOID 2249
#define RECORDARRAYOID 2287
#define CSTRINGOID 2275
#define ANYOID 2276
#define ANYARRAYOID 2277
#define VOIDOID 2278
#define TRIGGEROID 2279
#define EVTTRIGGEROID 3838
#define LANGUAGE_HANDLEROID 2280
#define INTERNALOID 2281
#define OPAQUEOID 2282
#define ANYELEMENTOID 2283
#define ANYNONARRAYOID 2776
#define ANYENUMOID 3500
#define FDW_HANDLEROID 3115
#define ANYRANGEOID 3831
#include "Yap.h"
#include "Yatom.h"
#include "YapText.h"
#include "cut_c.h"
#include "eval.h"
#include "myddas.h"
#ifdef MYDDAS_STATS
#include "myddas_structs.h"
#include "myddas_statistics.h"
#endif
#include "myddas_wkb2prolog.h"
#define CALL_POSTGRES(f) \
{ \
res = PQ ## f; \
if (PQstatus(db) != CONNECTION_OK) { \
fprintf(stderr, "Error %s when calling %s, line %d\n", \
PQerrorMessage(db), #f,__LINE__); \
exit (1); \
} \
PQclear(res); \
} \
#define GET_POSTGRES(f, res) \
{ \
res = PQ ## f; \
if (PQstatus(db) != CONNECTION_OK) { \
fprintf (stderr, "%s failed with status %s, line %d\n", \
#f, PQerrorMessage (db), __LINE__); \
exit (1); \
} \
} \
static Int NULL_id = 0;
typedef struct result_set {
PGconn *db;
PGresult *res;
const char *stmt;
int i ;
int nrows;
int ncols;
} resultSet;
void Yap_InitMYDDAS_PGPreds(void);
void Yap_InitBackMYDDAS_PGPreds(void);
static Int c_postgres_connect( USES_REGS1 );
static Int c_postgres_disconnect( USES_REGS1 );
static Int c_postgres_number_of_fields( USES_REGS1 );
static Int c_postgres_get_attributes_types( USES_REGS1 );
static Int c_postgres_query( USES_REGS1 );
static Int c_postgres_table_write( USES_REGS1 );
static Int c_postgres_row( USES_REGS1 );
static Int c_postgres_row_cut( USES_REGS1 );
static Int c_postgres_get_fields_properties( USES_REGS1 );
static Int c_postgres_get_next_result_set( USES_REGS1 );
static Int c_postgres_get_database( USES_REGS1 );
static Int c_postgres_change_database( USES_REGS1 );
void Yap_InitMYDDAS_PGPreds(void)
{
/* db_dbect: Host x User x Passwd x Database x dbection x ERROR_CODE */
Yap_InitCPred("c_postgres_connect", 7, c_postgres_connect, 0);
/* db_number_of_fields: Relation x connection x NumberOfFields */
Yap_InitCPred("c_postgres_number_of_fields",3, c_postgres_number_of_fields, 0);
/* db_get_attributes_types: Relation x TypesList */
Yap_InitCPred("c_postgres_get_attributes_types", 3, c_postgres_get_attributes_types, 0);
/* db_query: SQLQuery x ResultSet x conection */
Yap_InitCPred("c_postgres_query", 5, c_postgres_query, 0);
/* db_disconnect: connection */
Yap_InitCPred("c_postgres_disconnect", 1,c_postgres_disconnect, 0);
/* db_table_write: Result Set */
Yap_InitCPred("c_postgres_table_write", 1, c_postgres_table_write, 0);
/* db_get_fields_properties: PredName x connection x PropertiesList*/
Yap_InitCPred("c_postgres_get_fields_properties",3,c_postgres_get_fields_properties,0);
Yap_InitCPred("c_postgres_get_next_result_set",2,c_postgres_get_next_result_set,0);
/* c_postgres_get_database: connection x DataBaseName */
Yap_InitCPred("c_postgres_get_database",2,c_postgres_get_database,0);
/* c_postgres_change_database: connection x DataBaseName */
Yap_InitCPred("c_postgres_change_database",2,c_postgres_change_database,0);
}
void Yap_InitBackMYDDAS_PGPreds(void)
{
/* db_row: ResultSet x Arity x ListOfArgs */
Yap_InitCPredBackCut("c_postgres_row", 3, sizeof(Int),
c_postgres_row,
c_postgres_row,
c_postgres_row_cut, 0);
}
static Int
c_postgres_connect( USES_REGS1 ) {
int i=0;
Term arg_host = Deref(ARG1);
Term arg_user = Deref(ARG2);
Term arg_passwd = Deref(ARG3);
Term arg_database = Deref(ARG4);
Term arg_port = Deref(ARG5);
Term arg_socket = Deref(ARG6);
Term arg_conn = ARG7;
Term tempty = MkAtomTerm(Yap_LookupAtom(""));
Term tzero = MkIntTerm(0);
const char *keywords[8], *values[8];
char ports[16];
if (IsApplTerm(arg_host)) {
keywords[i] = "hostaddr";
values[i++] = RepAtom(AtomOfTerm(ArgOfTerm(1, arg_host)))->StrOfAE;
} else {
keywords[i] = "host";
values[i++] = RepAtom(AtomOfTerm(arg_host))->StrOfAE;
}
if (IsNonVarTerm(arg_user) && arg_user != tempty && IsAtomTerm(arg_user)) {
keywords[i] = "user";
values[i++] = RepAtom(AtomOfTerm(arg_user))->StrOfAE;
}
if (IsNonVarTerm(arg_user) && arg_passwd != tempty && IsAtomTerm(arg_passwd)) {
keywords[i] = "password";
values[i++] = RepAtom(AtomOfTerm(arg_passwd))->StrOfAE;
}
if (IsNonVarTerm(arg_user) && arg_database != tempty && IsAtomTerm(arg_database)) {
keywords[i] = "dbase";
values[i++] = RepAtom(AtomOfTerm(arg_database))->StrOfAE;
}
if (IsNonVarTerm(arg_user) && arg_port != tzero && IsIntTerm(arg_port)) {
keywords[i] = "port";
snprintf(ports, sizeof(ports)-1, "%d", (int)IntOfTerm(arg_port));
values[i++] = ports;
} else if (IsNonVarTerm(arg_socket) && arg_socket != tempty && IsIntTerm(arg_socket)) {
keywords[i] = "port";
values[i++] = RepAtom(AtomOfTerm(arg_database))->StrOfAE;
}
keywords[i] = NULL;
values[i++] = NULL;
/* Make a connection to the database */
PGconn *db = PQconnectdbParams(keywords, values, 0);
/* Check to see that the backend c onnection was successfully made */
if (PQstatus(db) != CONNECTION_OK)
{
fprintf(stderr, "Connection to database failed: %s",
PQerrorMessage(db));
return FALSE;
}
if (!Yap_unify(arg_conn, MkAddressTerm(db)))
return FALSE;
else
{
/* Criar um novo no na lista de ligacoes*/
MYDDAS_UTIL_CONNECTION cnew = myddas_util_add_connection(db,NULL,MYDDAS_POSTGRES);
if (cnew == NULL){
#ifdef DEBUG
printf("ERROR: ** c_db_my_connect ** Error allocating memory\n");
#endif
return FALSE;
}
return TRUE;
}
}
static MYDDAS_STATS_TIME
myddas_stat_init_query( PGconn *db )
{
#ifdef MYDDAS_STATS
MYDDAS_UTIL_connecTION node = myddas_util_search_connection(db);
MyddasULInt count = 0;
/* Count the number of querys made to the server */
MyddasULInt number_querys;
MYDDAS_STATS_CON_GET_NUMBER_QUERIES_MADE(node,number_querys);
MYDDAS_STATS_CON_SET_NUMBER_QUERIES_MADE(node,++number_querys);
MYDDAS_STATS_CON_GET_NUMBER_QUERIES_MADE_COUNT(node,count);
MYDDAS_STATS_CON_SET_NUMBER_QUERIES_MADE_COUNT(node,++count);
/* Measure time spent by the PG Server
processing the SQL Query */
return myddas_stats_walltime();
#else
return NULL;
#endif
}
static MYDDAS_STATS_TIME
myddas_stat_end_query( MYDDAS_STATS_TIME start )
{
MYDDAS_STATS_TIME diff = NULL;
#ifdef MYDDAS_STATS
/* Measure time spent by the PG Server
processing the SQL Query */
MYDDAS_STATS_TIME end = myddas_stats_walltime();
MYDDAS_STATS_INITIALIZE_TIME_STRUCT(diff,time_copy);
myddas_stats_subtract_time(diff,end,start);
diff = myddas_stats_time_copy_to_final(diff);
MYDDAS_FREE(end,struct myddas_stats_time_struct);
MYDDAS_FREE(start,struct myddas_stats_time_struct);
MYDDAS_STATS_CON_GET_TOTAL_TIME_DBSERVER(node,total_time);
/* Automacally updates the MYDDAS_STRUCTURE */
myddas_stats_add_time(total_time,diff,total_time);
MYDDAS_STATS_CON_GET_TOTAL_TIME_DBSERVER_COUNT(node,count);
MYDDAS_STATS_CON_SET_TOTAL_TIME_DBSERVER_COUNT(node,++count);
MYDDAS_STATS_TIME time = NULL;
MYDDAS_STATS_CON_GET_LAST_TIME_DBSERVER(node,time);
myddas_stats_move_time(diff,time);
MYDDAS_STATS_CON_GET_LAST_TIME_DBSERVER_COUNT(node,count);
MYDDAS_STATS_CON_SET_LAST_TIME_DBSERVER_COUNT(node,++count);
#endif
return diff;
}
#ifdef MYDDAS_STATS
/* measure transfer time */
static void
myddas_stat_transfer_query( MYDDAS_STATS_TIME diff )
{
/* Measure time spent by the PG Server
transferring the result of the last query
back to the client */
start = myddas_stats_walltime();
/* Measure time spent by the PG Server
transferring the result of the last query
back to the client */
end = myddas_stats_walltime();
MYDDAS_STATS_INITIALIZE_TIME_STRUCT(diff,time_copy);
myddas_stats_subtract_time(diff,end,start);
diff = MYDDAS_STATS_TIME_copy_to_final(diff);
MYDDAS_FREE(end,struct myddas_stats_time_struct);
MYDDAS_FREE(start,struct myddas_stats_time_struct);
MYDDAS_STATS_CON_GET_TOTAL_TIME_TRANSFERING(node,total_time);
/* Automacally updates the MYDDAS_STRUCTURE */
myddas_stats_add_time(total_time,diff,total_time);
MYDDAS_STATS_CON_GET_TOTAL_TIME_TRANSFERING_COUNT(node,count);
MYDDAS_STATS_CON_SET_TOTAL_TIME_TRANSFERING_COUNT(node,++count);
time = NULL;
MYDDAS_STATS_CON_GET_LAST_TIME_TRANSFERING(node,time);
MYDDAS_STATS_CON_GET_LAST_TIME_TRANSFERING_COUNT(node,count);
MYDDAS_STATS_CON_SET_LAST_TIME_TRANSFERING_COUNT(node,++count);
myddas_stats_move_time(diff,time);
/* Measure the number of Rows returned from the server */
if (res_set != NULL)
{
/* With an INSERT statement, PG_(use or store)_result()
returns a NULL pointer*/
/* This is only works if we use PG_store_result */
MyddasUInt numberRows = PG_num_rows(res_set);
MyddasUInt rows;
myddas_stat_transfer_query( diff );
MYDDAS_STATS_CON_GET_TOTAL_ROWS(node,rows);
numberRows = numberRows + rows;
MYDDAS_STATS_CON_SET_TOTAL_ROWS(node,numberRows);
MYDDAS_STATS_CON_GET_TOTAL_ROWS_COUNT(node,count);
MYDDAS_STATS_CON_SET_TOTAL_ROWS_COUNT(node,++count);
/* Calculate the ammount of data sent by the server */
MyddasUInt total,number_fields = PG_num_fields(res_set);
PG_ROW row;
MyddasULInt i;
total=0;
while ((row = PG_fetch_row(res_set)) != NULL){
PG_field_seek(res_set,0);
for(i=0;i<number_fields;i++){
if (row[i] != NULL)
total = total + strlen(row[i]);
}
}
MYDDAS_STATS_CON_SET_LAST_BYTES_TRANSFERING_FROM_DBSERVER(node,total);
MYDDAS_STATS_CON_GET_LAST_BYTES_TRANSFERING_FROM_DBSERVER_COUNT(node,count);
MYDDAS_STATS_CON_SET_LAST_BYTES_TRANSFERING_FROM_DBSERVER_COUNT(node,++count);
MyddasUInt bytes = 0;
MYDDAS_STATS_CON_GET_TOTAL_BYTES_TRANSFERING_FROM_DBSERVER(node,bytes);
total = total + bytes;
MYDDAS_STATS_CON_SET_TOTAL_BYTES_TRANSFERING_FROM_DBSERVER(node,total);
MYDDAS_STATS_CON_GET_TOTAL_BYTES_TRANSFERING_FROM_DBSERVER_COUNT(node,count);
MYDDAS_STATS_CON_SET_TOTAL_BYTES_TRANSFERING_FROM_DBSERVER_COUNT(node,++count);
}
}
#endif
/* db_query: SQLQuery x ResultSet x connection */
static Int
c_postgres_query( USES_REGS1 ) {
Term arg_sql_query = Deref(ARG1);
Term arg_result_set = Deref(ARG2);
Term arg_db = Deref(ARG3);
Term arg_mode = Deref(ARG4);
Term arg_arity = ARG5;
const char *sql = AtomName(AtomOfTerm(arg_sql_query));
const char *mode = AtomName(AtomOfTerm(arg_mode));
PGresult *res;
PGconn *db = AddressOfTerm(arg_db);
struct result_set *rs = malloc(sizeof( struct result_set));
if (!rs)
return FALSE;
rs->db = db;
rs->i = 0;
MYDDAS_STATS_TIME start, end;
start = myddas_stat_init_query( db );
const char *stmt = AtomName(Yap_LookupAtom(sql));
/* Send query to server and process it */
if (strcmp(mode,"store_result")!=0) {
// Leave data for extraction
CALL_POSTGRES (prepare(db, stmt, sql, 0, NULL) );
GET_POSTGRES (describePrepared(db, stmt) , res );
rs->stmt = stmt;
rs->res = NULL;
rs->nrows = -1;
rs->ncols = PQnfields(res);
if (!Yap_unify(arg_arity, MkIntegerTerm(rs->ncols)))
{
free(rs);
return FALSE;
}
if (!Yap_unify(arg_result_set, MkAddressTerm( rs)))
{
free(rs);
return FALSE;
}
return TRUE;
} else {
// construct an intermediate table, res_set
int nrows, ncols;
GET_POSTGRES (exec(db, sql), res );
end = myddas_stat_end_query( start );
ncols = PQnfields(res);
nrows = PQntuples(res);
//INSERT statements don't return any res_set
if (nrows == 0) {
return TRUE;
}
if (!Yap_unify(arg_arity, MkIntegerTerm(nrows))){
/*
* Should PGclear PGresult whenever it is no longer needed to avoid memory
* leaks
*/
free(rs);
PQclear(res);
return FALSE;
}
rs->stmt = NULL;
rs->res = res;
rs->nrows = nrows;
rs->ncols = ncols;
if (!Yap_unify(arg_result_set, MkAddressTerm( rs)))
{
free(rs);
PQclear(res);
return FALSE;
}
}
return TRUE;
}
static Int
c_postgres_number_of_fields( USES_REGS1 ) {
Term arg_relation = Deref(ARG1);
Term arg_db = Deref(ARG2);
Term arg_fields = ARG3;
const char *relation = AtomName(AtomOfTerm(arg_relation));
PGconn *db = AddressOfTerm(arg_db);
const char *stmt;
PGresult *res;
char sql[256];
snprintf(sql, 255, "SELECT * FROM TABLE `%s`",relation);
// Leave data for extraction
stmt = AtomName(Yap_LookupAtom(sql));
CALL_POSTGRES (prepare(db, stmt, sql, 0, NULL) );
/* executar a query SQL */
int nrows = PQntuples(res);
PQclear( res );
return Yap_unify(arg_fields, MkIntegerTerm( nrows ));
}
/* db_get_attributes_types: RelName x connection -> TypesList */
static Int
c_postgres_get_attributes_types( USES_REGS1 ) {
Term arg_relation = Deref(ARG1);
Term arg_db = Deref(ARG2);
Term arg_types_list = Deref(ARG3);
Term list, head;
const char *relation = AtomName(AtomOfTerm(arg_relation));
PGconn *db = AddressOfTerm(arg_db);
char sql[256];
//int row;
PGresult *res;
const char *stmt;
Int rc = TRUE;
sprintf(sql,"SELECT * FROM TABLE `%s`",relation);
// Leave data for extraction
stmt = AtomName(Yap_LookupAtom(sql));
// Leave data for extraction
GET_POSTGRES (prepare(db, stmt, sql, 0, NULL), res );
/* executar a query SQL */
int cols = PQnfields( res ), col;
list = arg_types_list;
for (col = 0; col < cols; col++)
{
const char *tm;
head = HeadOfTerm(list);
rc = (
rc && Yap_unify(head, MkAtomTerm(Yap_LookupAtom(PQfname(res, col))) ) );
list = TailOfTerm(list);
head = HeadOfTerm(list);
list = TailOfTerm(list);
Oid type = PQftype(res, col);
switch(type) {
case BYTEOID:
case CHAROID:
case INT2OID:
case INT4OID:
case INT8OID:
tm = "integer";
break;
case FLOAT4OID:
case FLOAT8OID:
tm = "real";
break;
case NAMEOID:
tm = "atom";
break;
case TEXTOID:
tm = "string";
break;
default:
tm = "unknown type";
break;
}
if (!Yap_unify(head, MkAtomTerm(Yap_LookupAtom(tm))) )
rc = FALSE;
}
PQclear( res );
return rc;
}
/* db_disconnect */
static Int
c_postgres_disconnect( USES_REGS1 ) {
Term arg_db = Deref(ARG1);
PGconn *db = AddressOfTerm(arg_db);
if ((myddas_util_search_connection(db)) != NULL)
{
myddas_util_delete_connection(db);
PQfinish(db);
return TRUE;
}
else
{
return FALSE;
}
}
/* db_table_write: Result Set */
static Int
c_postgres_table_write( USES_REGS1 ) {
/*
Term arg_res_set = Deref(ARG1);
PG_RES *res_set = (PG_RES *) IntegerOfTerm(arg_res_set);
mydas_util_table_write(res_set);
PG_free_result(res_set);
*/
return TRUE;
}
static Int
c_postgres_get_fields_properties( USES_REGS1 ) {
Term nome_relacao = Deref(ARG1);
Term arg_db = Deref(ARG2);
Term fields_properties_list = Deref(ARG3);
Term head, list;
PGresult *res;
const char *relation = AtomName(AtomOfTerm(nome_relacao));
char sql[256];
Int num_fields,i;
PGconn *db = AddressOfTerm(arg_db);
sprintf(sql,"\\d+ `%s`",relation);
GET_POSTGRES (exec(db, sql), res );
#if MYDDAS_STATS
end = myddas_stat_end_query( start );
#endif
Functor functor = Yap_MkFunctor(Yap_LookupAtom("property"),4);
list = fields_properties_list;
num_fields = PQntuples(res);
int coln;
const char *propname;
if (( coln = PQfnumber(res,"Column")) < 0)
return FALSE;
if (( propname = PQfname(res,2)) == NULL)
return FALSE;
if (!strstr(propname, "Table") || !strstr(propname, "Modifiers") )
return FALSE;
for (i=0;i<num_fields;i++)
{
Term properties[4];
head = HeadOfTerm(list);
if (!PQgetisnull(res, i, coln)) {
const char *col = PQgetvalue(res, i, coln);
properties[0] = MkAtomTerm(Yap_LookupAtom(col));
}
if (!PQgetisnull(res, i, 2)) {
const char *props = PQgetvalue(res, i, 2);
properties[1] = MkIntegerTerm(strstr(props, "not null") != NULL); //Can't be NULL
properties[2] = MkIntegerTerm(strstr(props, "prim") != NULL); //Can't be NULL
properties[3] = MkIntegerTerm(strstr(props, "nextval") != NULL); //Can't be NULL
}
list = TailOfTerm(list);
if (!Yap_unify(head, Yap_MkApplTerm(functor,4,properties))){
return FALSE;
}
}
PQclear(res);
return TRUE;
}
/* c_postgres_get_next_result_set: connection * NextResSet */
static Int
c_postgres_get_next_result_set( USES_REGS1 ) {
//Term arg_db = Deref(ARG1);
//Term arg_next_res_set = Deref(ARG2);
//PGconn *db = AddressOfTerm(arg_db);
return FALSE;
}
static Int
c_postgres_get_database( USES_REGS1 ) {
Term arg_con = Deref(ARG1);
Term arg_database = Deref(ARG2);
if (!Yap_unify(arg_database,arg_con))
return FALSE;
return TRUE;
}
static Int
c_postgres_change_database( USES_REGS1 ) {
/* no-op for now */
return TRUE;
}
static Int
c_postgres_row_cut( USES_REGS1 ) {
struct result_set *res_set=NULL;
PGconn *db = res_set->db;
res_set = AddressOfTerm(EXTRA_CBACK_CUT_ARG(Term,1));
PQclear( res_set->res );
PQfinish( db );
free(res_set);
return TRUE;
}
#define cvt( s ) cvt__( s PASS_REGS )
static Term
cvt__(const char *s USES_REGS) {
return Yap_CharsToTDQ( s, CurrentModule PASS_REGS);
}
/* db_row: ResultSet x Arity_ListOfArgs x ListOfArgs -> */
static Int
c_postgres_row( USES_REGS1 ) {
#ifdef MYDDAS_STATS
/* Measure time used by the */
/* c_postgres_row function */
//MYDDAS_STATS_TIME start,end,total_time,diff;
MyddasULInt count = 0;
MYDDAS_STATS_TIME start, end;
start = myddas_stats_walltime();
#endif
Term arg_result_set = Deref(ARG1);
Term arg_arity = Deref(ARG2);
Term arg_list_args = Deref(ARG3);
Int rc = TRUE;
if (IsVarTerm( arg_result_set ) ) {
if (!c_postgres_query( PASS_REGS1 ) ) {
cut_fail();
}
arg_result_set = Deref(ARG1);
EXTRA_CBACK_ARG(3,1)= arg_result_set ;
EXTRA_CBACK_ARG(3,2)= MkIntegerTerm(0) ;
}
struct result_set *res_set = AddressOfTerm(arg_result_set);
Term head, list, NULL_atom[1];
Int arity;
list = arg_list_args;
arity = IntegerOfTerm(arg_arity);
PGconn *db = res_set->db;
if (res_set->stmt == NULL ) {
CACHE_REGS
Int i= IntegerOfTerm(EXTRA_CBACK_CUT_ARG(Term,2)), j;
Int rc = true;
// data needs to be copied to Prolog
// row by row
#ifdef MYDDAS_STATS
MYDDAS_STATS_TIME diff;
MYDDAS_STATS_INITIALIZE_TIME_STRUCT(diff,time_copy);
#endif
for (j = 0; j < arity; j++)
{
/* Ts -> List */
const char *field = PQgetvalue(res_set->res, i, j);
head = HeadOfTerm(list);
list = TailOfTerm(list);
rc = (rc && Yap_unify(head, cvt( field )) );
}
if (rc) {
EXTRA_CBACK_ARG(3,2)= MkIntegerTerm(i+1);
return rc;
}
#ifdef MYDDAS_STATS
myddas_stat_transfer_query( diff );
#endif
cut_fail();
}
// busy-waiting
if (!PQsetnonblocking(res_set->db, 1)) {
fprintf(stderr, "Connection to database failed: %s",
PQerrorMessage(db));
return FALSE;
}
if (!PQsendQueryPrepared(res_set->db, res_set->stmt, 0, NULL, NULL, NULL, 0)) {
fprintf(stderr, "Connection to database failed: %s",
PQerrorMessage(db));
return FALSE;
}
PGresult *res;
while((res = PQgetResult(res_set->db)) != NULL);
int i;
if ((i = res_set->nrows) == 0) {
// no more data
PQfinish( res_set->db );
free(res_set);
#ifdef MYDDAS_STATS
end = myddas_stats_walltime();
MYDDAS_STATS_INITIALIZE_TIME_STRUCT(diff,time_copy);
myddas_stats_subtract_time(diff,end,start);
diff = myddas_stats_time_copy_to_final(diff);
MYDDAS_FREE(end,struct myddas_stats_time_struct);
MYDDAS_FREE(start,struct myddas_stats_time_struct);
MYDDAS_STATS_GET_DB_ROW_FUNCTION(total_time);
myddas_stats_add_time(total_time,diff,total_time);
MYDDAS_STATS_GET_DB_ROW_FUNCTION_COUNT(count);
MYDDAS_STATS_SET_DB_ROW_FUNCTION_COUNT(++count);
MYDDAS_FREE(diff,struct myddas_stats_time_struct);
#endif /* MYDDAS_STATS */
cut_fail(); /* This macro already does a return FALSE */
} else if (i < res_set->nrows) {
list = arg_list_args;
Term tf;
int j;
for (j = 0; j < arity; j++)
{
/* convert data types here */
head = HeadOfTerm(list);
list = TailOfTerm(list);
if (!PQgetisnull(res, i, j)) {
const char *col = PQgetvalue(res, i, j);
tf = MkAtomTerm(Yap_LookupAtom(col));
} else {
NULL_atom[0] = MkIntegerTerm(NULL_id++);
tf = Yap_MkApplTerm(Yap_MkFunctor(Yap_LookupAtom("NULL"),1),1,NULL_atom);
}
}
if (!Yap_unify(head, tf))
rc = FALSE;
#ifdef MYDDAS_STATS
end = myddas_stats_walltime();
myddas_stats_subtract_time(diff,end,start);
diff = myddas_stats_time_copy_to_final(diff);
MYDDAS_FREE(end,struct myddas_stats_time_struct);
MYDDAS_FREE(start,struct myddas_stats_time_struct);
MYDDAS_STATS_GET_DB_ROW_FUNCTION(total_time);
myddas_stats_add_time(total_time,diff,total_time);
MYDDAS_STATS_GET_DB_ROW_FUNCTION_COUNT(count);
MYDDAS_STATS_SET_DB_ROW_FUNCTION_COUNT(++count);
MYDDAS_FREE(diff,struct myddas_stats_time_struct);
#endif /* MYDDAS_STATS */
} return rc;
}
#endif /* MYDDAS_PG */