Changeset 962 for trunk/zoo-project/zoo-kernel/service_callback.c
- Timestamp:
- Oct 21, 2020, 6:31:09 PM (4 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/zoo-project/zoo-kernel/service_callback.c
r955 r962 27 27 */ 28 28 29 #include "service_json.h"30 29 #include "service_internal_ms.h" 31 #include "sqlapi.h"32 30 #include <pthread.h> 33 31 #include <libxml/tree.h> … … 42 40 43 41 #include "service_callback.h" 42 #include "service_json.h" 43 #include "sqlapi.h" 44 #include <ulinet.h> 45 44 46 45 47 #ifdef __cplusplus … … 66 68 */ 67 69 int cStep=0; 70 /** 71 * Maximum value of PercentCompleted 72 */ 73 int maxProgress=0; 68 74 /** 69 75 * Is there any ongoing HTTP request … … 112 118 return false; 113 119 } 114 120 /** 121 * Practically invoke the callback, meaning sending the HTTP POST request. 122 * 123 * @param args local_params containing all the variables required 124 */ 125 void* _invokeBasicCallback(void* args){ 126 #ifdef CALLBACK_DEBUG 127 fprintf(stderr,"************************* From thread %d %s %d: REQUEST CONFIGURE (%s)\n",pthread_self(),__FILE__,__LINE__,arg->url->value); 128 fflush(stderr); 129 #endif 130 local_params* arg=(local_params*)args; 131 if(arg->state<cStep){ 132 #ifdef CALLBACK_DEBUG 133 fprintf(stderr,"************************* From thread %d %s %d: REQUEST CANCELLED (%s) EXIT!\n",pthread_self(),__FILE__,__LINE__,arg->url->value); 134 fflush(stderr); 135 #endif 136 freeMaps(&arg->conf); 137 free(arg->conf); 138 freeMap(&arg->url); 139 free(arg->url); 140 pthread_exit(NULL); 141 return NULL; 142 } 143 HINTERNET hInternet,res1; 144 const struct tm *tm; 145 size_t len; 146 char *tmp1; 147 map *tmpStatus; 148 map* pmTmp=getMapFromMaps(arg->conf,"lenv","status"); 149 hInternet=InternetOpen("ZooWPSClient\0", 150 INTERNET_OPEN_TYPE_PRECONFIG, 151 NULL,NULL, 0); 152 if(!CHECK_INET_HANDLE(hInternet)){ 153 InternetCloseHandle (&hInternet); 154 return NULL; 155 } 156 const char* jsonStr=json_object_to_json_string_ext(arg->res,JSON_C_TO_STRING_PLAIN); 157 while( arg->state != SERVICE_SUCCEEDED && arg->state != SERVICE_FAILED && isOngoing>0 ){ 158 zSleep(100); 159 } 160 if(arg->state==SERVICE_STARTED && pmTmp!=NULL){ 161 if(maxProgress<=atoi(pmTmp->value)){ 162 maxProgress=atoi(pmTmp->value); 163 }else{ 164 #ifdef CALLBACK_DEBUG 165 fprintf(stderr,"************************* From thread %d %s %d: REQUEST CANCELLED (%s) EXIT!\n",pthread_self(),__FILE__,__LINE__,arg->url->value); 166 fflush(stderr); 167 #endif 168 freeMaps(&arg->conf); 169 free(arg->conf); 170 freeMap(&arg->url); 171 free(arg->url); 172 pthread_exit(NULL); 173 return NULL; 174 } 175 }else 176 maxProgress=101; 177 isOngoing=1; 178 maps* tmpConf=createMaps("main"); 179 tmpConf->content=createMap("memory","load"); 180 181 hInternet.waitingRequests[0] = zStrdup(arg->url->value); 182 res1 = InternetOpenUrl (&hInternet, 183 hInternet.waitingRequests[0], 184 (char*)jsonStr, strlen(jsonStr), 185 INTERNET_FLAG_NO_CACHE_WRITE, 186 0,tmpConf); 187 AddHeaderEntries(&hInternet,arg->conf); 188 AddMissingHeaderEntry(&hInternet.ihandle[hInternet.nb-1],"Content-Type","application/json"); 189 #ifdef CALLBACK_DEBUG 190 curl_easy_setopt(hInternet.ihandle[hInternet.nb-1].handle, CURLOPT_VERBOSE, 1); 191 #endif 192 if(hInternet.ihandle[hInternet.nb-1].header!=NULL) 193 curl_easy_setopt(hInternet.ihandle[hInternet.nb-1].handle,CURLOPT_HTTPHEADER,hInternet.ihandle[hInternet.nb-1].header); 194 processDownloads(&hInternet); 195 freeMaps(&tmpConf); 196 free(tmpConf); 197 #ifdef CALLBACK_DEBUG 198 char *tmp = (char *) malloc ((hInternet.ihandle[0].nDataLen + 1) 199 * sizeof (char)); 200 if (tmp == NULL) 201 { 202 setMapInMaps(arg->conf,"lenv","message",_("Unable to allocate memory")); 203 setMapInMaps(arg->conf,"lenv","code","InternalError"); 204 return NULL; 205 } 206 size_t bRead; 207 InternetReadFile (hInternet.ihandle[0], 208 (LPVOID) tmp, 209 hInternet. 210 ihandle[0].nDataLen, 211 &bRead); 212 tmp[hInternet.ihandle[0].nDataLen] = 0; 213 fprintf(stderr,"************************* From thread %d %s %d: REQUEST END \n%s",pthread_self(),__FILE__,__LINE__,tmp); 214 fflush(stderr); 215 free(tmp); 216 #endif 217 json_object_put(arg->res); 218 InternetCloseHandle(&hInternet); 219 isOngoing=0; 220 freeMaps(&arg->conf); 221 free(arg->conf); 222 freeMap(&arg->url); 223 if(arg->url!=NULL) 224 free(arg->url); 225 pthread_exit(NULL); 226 } 227 228 /** 229 * Invoke the callback in case there is a [subscriber] section containing one 230 * or more url parameter. 231 * 232 * @param conf the maps containing the main configuration file definitions 233 * @param state the service state SERVICE_SUCCEEDED / STARTED / FAILED 234 * @return bool true in case of success, false in other cases 235 */ 236 bool invokeBasicCallback(maps* conf,int state){ 237 map* url=getMapFromMaps(conf,"subscriber","inProgressUri"); 238 if(state==SERVICE_SUCCEEDED) 239 url=getMapFromMaps(conf,"subscriber","successUri"); 240 else 241 if(state==SERVICE_FAILED) 242 url=getMapFromMaps(conf,"subscriber","failedUri"); 243 if(url==NULL) 244 return false; 245 map* url0=createMap("url",url->value); 246 map* sname=getMapFromMaps(conf,"lenv","identifier"); 247 if(sname!=NULL && isProhibited(conf,sname->value)) 248 return false; 249 if(state<cStep) 250 return true; 251 if(cStep!=state || isOngoing==0){ 252 json_object *res=NULL; 253 if(state==SERVICE_SUCCEEDED || state==SERVICE_FAILED){ 254 maps* pmsTmp=getMaps(conf,"lenv"); 255 setMapInMaps(conf,"lenv","no-write","true"); 256 map* pmTmp=getMapFromMaps(conf,"lenv","usid"); 257 if(pmTmp!=NULL){ 258 map* pmResponse=getMapFromMaps(conf,"lenv","jsonStr"); 259 res=parseJson(conf,pmResponse->value); 260 } 261 }else 262 res=createStatus(conf,state); 263 if(local_arguments==NULL) 264 local_arguments=(local_params**)malloc(sizeof(local_params*)); 265 else 266 local_arguments=(local_params**)realloc(local_arguments,(nbThreads+1)*sizeof(local_params*)); 267 local_arguments[nbThreads]=(local_params*)malloc(MAPS_SIZE+MAP_SIZE+sizeof(json_object*)+(2*sizeof(int))); 268 local_arguments[nbThreads]->conf=dupMaps(&conf); 269 local_arguments[nbThreads]->url=url0; 270 local_arguments[nbThreads]->res=res; 271 local_arguments[nbThreads]->step=0; 272 local_arguments[nbThreads]->state=state; 273 cStep=state; 274 if(myThreads==NULL) 275 myThreads=(pthread_t*)malloc((nbThreads+1)*sizeof(pthread_t)); 276 else 277 myThreads=(pthread_t*)realloc(myThreads,(nbThreads+1)*sizeof(pthread_t)); 278 if(pthread_create(&myThreads[nbThreads], NULL, _invokeBasicCallback, (void*)local_arguments[nbThreads])==-1){ 279 setMapInMaps(conf,"lenv","message",_("Unable to create a new thread")); 280 return false; 281 } 282 nbThreads++; 283 } 284 return true; 285 } 286 115 287 /** 116 288 * Practically invoke the callback, meaning sending the HTTP POST request. … … 336 508 map* tmpMap=getMapArray(curs->content,"value",ii); 337 509 char tmpStr[100]; 338 sprintf(tmpStr,"% d",strlen(tmpMap->value));510 sprintf(tmpStr,"%ld",strlen(tmpMap->value)); 339 511 setMapArray(curs->content,"size",ii,tmpStr); 340 512 tmpMap=getMapArray(curs->content,"mimeType",ii); … … 570 742 jsStr=json_object_new_string(tmp2->value); 571 743 json_object_object_add(res2,"target_path",jsStr); 572 json_object *res4=json_object_object_get(res1,tmp0->value); 573 if(json_object_is_type(res4,json_type_null)){ 574 json_object_object_add(res1,tmp0->value,res2); 575 }else{ 576 if(json_object_is_type(res4,json_type_object) && !json_object_is_type(res4, json_type_array)){ 577 json_object *res3=json_object_new_array(); 578 json_object_array_add(res3,json_object_get(res4)); 579 json_object_array_add(res3,res2); 580 json_object_object_del(res1,tmp0->value); 581 json_object_object_add(res1,tmp0->value,res3); 582 }else 583 json_object_array_add(res4,res2); 744 json_object *res4=NULL; 745 if(json_object_object_get_ex(res1,tmp0->value,&res4)!=FALSE){ 746 if(json_object_is_type(res4,json_type_null)){ 747 json_object_object_add(res1,tmp0->value,res2); 748 }else{ 749 if(json_object_is_type(res4,json_type_object) && !json_object_is_type(res4, json_type_array)){ 750 json_object *res3=json_object_new_array(); 751 json_object_array_add(res3,json_object_get(res4)); 752 json_object_array_add(res3,res2); 753 json_object_object_del(res1,tmp0->value); 754 json_object_object_add(res1,tmp0->value,res3); 755 }else 756 json_object_array_add(res4,res2); 757 } 584 758 } 585 759 } … … 866 1040 */ 867 1041 void cleanupCallbackThreads(){ 1042 while( isOngoing>0 ){ 1043 zSleep(100); 1044 } 868 1045 int i=0; 869 1046 for(i=0;i<nbThreads;i++){
Note: See TracChangeset
for help on using the changeset viewer.