- Timestamp:
- Nov 21, 2017, 10:24:14 AM (6 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/prototype-v0/zoo-project/zoo-kernel/service_callback.c
r851 r854 30 30 #include "service_json.h" 31 31 #include "service_internal_ms.h" 32 #include "sqlapi.h" 32 33 #include <pthread.h> 34 #include <libxml/tree.h> 35 #include <libxml/parser.h> 36 #include <libxml/xpath.h> 37 #include <libxml/xpathInternals.h> 38 39 #include <libxslt/xslt.h> 40 #include <libxslt/xsltInternals.h> 41 #include <libxslt/transform.h> 42 #include <libxslt/xsltutils.h> 33 43 34 44 #ifdef __cplusplus … … 36 46 #endif 37 47 48 /** 49 * Parameter definition to be used for sending parameters to a thread. 50 */ 51 typedef struct { 52 maps *conf; //!< the main configuration file 53 map *url; //!< the callback url maps 54 json_object *res;//!< the JSON object to post 55 int step; //!< the current step [0,6] 56 int state; //!< the current state [0,1] 57 } local_params; 58 59 /** 60 * Number of threads 61 */ 38 62 int nbThreads=0; 63 /** 64 * Current step 65 */ 39 66 int cStep=0; 67 /** 68 * Is there any ongoing HTTP request 69 */ 70 int isOngoing=0; 71 /** 72 * Threads array 73 */ 40 74 pthread_t* myThreads=NULL; 75 /** 76 * Steps array 77 */ 41 78 bool steps[7][2]={ 42 79 {false,false}, … … 48 85 {false,false} 49 86 }; 87 /** 88 * Arguments array to give to the _invokeCallback thread's function 89 */ 90 local_params** local_arguments; 50 91 51 92 /** … … 71 112 } 72 113 73 /**74 * Parameter definition to be used for sending parameters to a thread.75 */76 typedef struct {77 maps *conf; //!< the main configuration file78 map *url; //!< the callback url maps79 json_object *res;//!< the JSON object to post80 int step; //!< the current step [0,6]81 int state; //!< the current state [0,1]82 } local_params;83 84 /**85 * Verify if the URL should use a shared cache or not.86 *87 * In case the security section contains a key named "shared", then if the88 * domain listed in the shared key are contained in the url given as parameter89 * then it return "SHARED" in other cases, it returns "OTHER".90 *91 * @param conf the main configuration file maps92 * @param url the URL to evaluate93 * @return a string "SHARED" in case the host is in a domain listed in the94 * shared key, "OTHER" in other cases.95 */96 char* getProvenance(maps* conf,const char* url){97 map* sharedCache=getMapFromMaps(conf,"security","shared");98 if(sharedCache!=NULL){99 char *res=NULL;100 char *hosts=sharedCache->value;101 char *curs=strtok(hosts,",");102 while(curs!=NULL){103 if(strstr(url,curs)==NULL)104 res="OTHER";105 else{106 res="SHARED";107 return res;108 }109 }110 return res;111 }112 return "OTHER";113 }114 114 115 115 /** … … 133 133 hInternet.waitingRequests[0] = zStrdup(URL); 134 134 free(URL); 135 fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS\n",pthread_self(),__FILE__,__LINE__); 135 #ifdef CALLBACK_DEBUG 136 fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS cStep %d %d\n",pthread_self(),__FILE__,__LINE__,cStep,isOngoing); 136 137 fprintf(stderr," * JSON: [%s] \n",jsonStr); 137 138 fprintf(stderr," * URL: %s/ \n\n",hInternet.waitingRequests[0]); 138 139 fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS\n",pthread_self(),__FILE__,__LINE__); 139 while( cStep!=7 && 140 #endif 141 while( (arg->step!=7 || isOngoing>0) && 140 142 ( cStep!=arg->step || (arg->state!=0 && steps[arg->step][0]==false) ) 141 143 ){ 142 sleep(1); 143 } 144 struct timespec tv; 145 tv.tv_sec = 0; 146 tv.tv_nsec = (long) 5*1e+9; 147 nanosleep(&tv, &tv); 148 //sleep(1); 149 } 150 isOngoing=1; 151 #ifdef CALLBACK_DEBUG 144 152 fprintf(stderr,"************************* From thread %d %s %d: REQUEST START\n\n",pthread_self(),__FILE__,__LINE__); 145 153 int i=0; 146 154 for(i=0;i<7;i++){ 147 155 fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]); 148 fflush(stderr); 149 } 150 fprintf(stderr,"************************* From thread %d %s %d: REQUEST START\n",pthread_self(),__FILE__,__LINE__); 156 } 157 #endif 158 const struct tm *tm; 159 size_t len; 160 time_t now; 161 char *tmp1; 162 map *tmpStatus; 163 164 now = time ( NULL ); 165 tm = localtime ( &now ); 166 167 tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char)); 168 len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm ); 169 170 #ifdef CALLBACK_DEBUG 171 fprintf(stderr,"************************* From thread %d %s %d: REQUEST START (%s)\n",pthread_self(),__FILE__,__LINE__,tmp1); 151 172 fflush(stderr); 173 #endif 174 free(tmp1); 152 175 res1 = InternetOpenUrl (&hInternet, 153 176 hInternet.waitingRequests[0], … … 158 181 //curl_easy_setopt(hInternet.ihandle[hInternet.nb].handle, CURLOPT_VERBOSE, 1);x 159 182 processDownloads(&hInternet); 160 fprintf(stderr,"************************* From thread %d %s %d: REQUEST END\n\n",pthread_self(),__FILE__,__LINE__); 161 fflush(stderr); 183 now = time ( NULL ); 184 tm = localtime ( &now ); 185 tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char)); 186 len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm ); 187 188 #ifdef CALLBACK_DEBUG 189 fprintf(stderr,"************************* From thread %d %s %d: REQUEST END (%s)\n\n",pthread_self(),__FILE__,__LINE__,tmp1); 190 #endif 191 free(tmp1); 162 192 char *tmp = (char *) malloc ((hInternet.ihandle[0].nDataLen + 1) 163 193 * sizeof (char)); … … 177 207 json_object_put(arg->res); 178 208 InternetCloseHandle(&hInternet); 209 isOngoing=0; 179 210 if(cStep==0 || cStep==6 || arg->state==1) 180 211 cStep=arg->step+1; 181 steps[arg->step][arg->state]=true; 212 #ifdef CALLBACK_DEBUG 182 213 fprintf(stderr,"************************* From thread %d %s %d: RESPONSE CONTENT\n",pthread_self(),__FILE__,__LINE__); 183 214 for(i=0;i<7;i++){ … … 187 218 fprintf(stderr,"************************* From thread %d %s %d\n\n",pthread_self(),__FILE__,__LINE__); 188 219 fflush(stderr); 220 #endif 221 steps[arg->step][arg->state]=true; 189 222 free(tmp); 190 free(args); 223 //free(args); 224 fprintf(stderr,"************************* From thread %d %s %d: EXIT\n\n",pthread_self(),__FILE__,__LINE__); 225 fflush(stderr); 191 226 pthread_exit(NULL); 192 227 } … … 237 272 238 273 tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char)); 239 240 len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm ); 274 len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%H:%M:%SZ", tm ); 241 275 json_object *jsStr0=json_object_new_string(tmp1); 242 276 json_object_object_add(res,"datetime",jsStr0); 243 277 free(tmp1); 278 244 279 switch(step){ 245 280 case 0: { 246 281 // Create a new analyze 247 282 maps* lenv=getMaps(conf,"lenv"); 248 sid=getMapFromMaps(conf," lenv","xrequest");283 sid=getMapFromMaps(conf,"renv","xrequest"); 249 284 if(sid!=NULL){ 250 285 json_object *jsStr=json_object_new_string(sid->value); … … 256 291 json_object_object_add(res,"process_identifier",jsStr); 257 292 } 293 // Save the Execute request on disk 294 map* tmpPath=getMapFromMaps(conf,"main","tmpPath"); 295 map* req=getMapFromMaps(conf,"renv","xrequest"); 296 sid=getMapFromMaps(conf,"lenv","usid"); 297 char* executePath=(char*)malloc((strlen(tmpPath->value)+strlen(sid->value)+14)*sizeof(char)); 298 sprintf(executePath,"%s/execute_%s.xml",tmpPath->value,sid->value); 299 FILE* saveExecute=fopen(executePath,"wb"); 300 fwrite(req->value,1,strlen(req->value)*sizeof(char),saveExecute); 301 fflush(saveExecute); 302 fclose(saveExecute); 303 setMapInMaps(conf,"lenv","execute_file",executePath); 304 free(executePath); 258 305 break; 259 306 } 307 260 308 case 1: { 261 309 // Fetching data inputs 262 310 maps* curs=inputs; 263 char *keys[8][2]={ 311 dumpMaps(curs); 312 char *keys[10][2]={ 264 313 { 265 314 "href", … … 287 336 }, 288 337 { 289 "ref_wfs_link", 290 "ref_wfs_link" 338 "ref_wcs_link", 339 "ref_wcs_link" 340 }, 341 { 342 "ref_wcs_preview_link", 343 "ref_wcs_preview_link" 291 344 }, 292 345 { 293 346 "geodatatype", 294 347 "datatype" 348 }, 349 { 350 "wcs_extent", 351 "boundingbox" 295 352 } 296 353 }; … … 300 357 sid=getMap(curs->content,"ref_wms_link"); 301 358 json_object *res2=json_object_new_object(); 302 if(tmpMap!=NULL && sid==NULL){ 303 addToMap(curs->content,"generated_file",tmpMap->value); 359 if(tmpMap!=NULL){ 360 if(sid==NULL){ 361 addToMap(curs->content,"generated_file",tmpMap->value); 362 addToMap(curs->content,"storage",tmpMap->value); 363 } 364 fprintf(stderr,"%s %d\n",__FILE__,__LINE__); 365 dumpMap(curs->content); 304 366 struct stat buf; 305 367 char timeStr[ 100 ] = ""; … … 314 376 } 315 377 setReferenceUrl(conf,curs); 316 //outputMapfile(conf,curs);317 dumpMaps(curs);318 378 } 319 379 int i=0; 320 380 int hasRef=-1; 321 for(;i< 8;i++){381 for(;i<10;i++){ 322 382 sid=getMap(curs->content,keys[i][0]); 323 383 if(sid!=NULL){ … … 340 400 break; 341 401 } 402 342 403 case 2: { 404 // Update the execute request stored on disk at step 1,1 to modify the references used. 405 if(state==0){ 406 fprintf(stderr,"%s %d \n",__FILE__,__LINE__); 407 fflush(stderr); 408 maps* curs=inputs; 409 fprintf(stderr,"%s %d \n",__FILE__,__LINE__); 410 fflush(stderr); 411 xmlInitParser(); 412 fprintf(stderr,"%s %d \n",__FILE__,__LINE__); 413 fflush(stderr); 414 map* xmlPath=getMapFromMaps(conf,"lenv","execute_file"); 415 dumpMap(xmlPath); 416 fprintf(stderr,"%s %d \n",__FILE__,__LINE__); 417 fflush(stderr); 418 while(curs!=NULL){ 419 fprintf(stderr,"%s %d \n",__FILE__,__LINE__); 420 fflush(stderr); 421 dumpMap(curs->content); 422 //map* bvMap=getMap(curs->content,"byValue"); 423 // TODO handle mapArray 424 //if(bvMap!=NULL && strncasecmp(bvMap->value,"true",4)==0){ 425 if(getMap(curs->content,"href")==NULL && getMap(curs->content,"mimeType")!=NULL){ 426 map* tmpMap=getMap(curs->content,"cache_file"); 427 addToMap(curs->content,"generated_file",tmpMap->value); 428 addToMap(curs->content,"storage",tmpMap->value); 429 setReferenceUrl(conf,curs); 430 fprintf(stderr,"%s %d \n",__FILE__,__LINE__); 431 fflush(stderr); 432 dumpMap(curs->content); 433 fprintf(stderr,"%s %d \n",__FILE__,__LINE__); 434 fflush(stderr); 435 const char *params[5]; 436 int xmlLoadExtDtdDefaultValue; 437 int hasFile=-1; 438 map* xslPath=getMapFromMaps(conf,"callback","template"); 439 fprintf(stderr,"%s %d \n",__FILE__,__LINE__); 440 fflush(stderr); 441 dumpMap(xslPath); 442 fprintf(stderr,"%s %d \n",__FILE__,__LINE__); 443 fflush(stderr); 444 map* filePath=getMap(curs->content,"ref_wfs_link"); 445 if(filePath==NULL) 446 filePath=getMap(curs->content,"ref_wcs_link"); 447 fprintf(stderr,"%s %d \n",__FILE__,__LINE__); 448 fflush(stderr); 449 dumpMap(filePath); 450 fprintf(stderr,"%s %d \n",__FILE__,__LINE__); 451 fflush(stderr); 452 char* inputName=curs->name; 453 fprintf(stderr,"%s %d \n",__FILE__,__LINE__); 454 fflush(stderr); 455 if(xslPath==NULL || xmlPath==NULL || filePath==NULL) 456 break; 457 fprintf(stderr,"%s %d \n",__FILE__,__LINE__); 458 fflush(stderr); 459 char *tmpParam=(char*)malloc((strlen(curs->name)+11)*sizeof(char)); 460 char *tmpParam1=(char*)malloc((strlen(filePath->value)+11)*sizeof(char)); 461 sprintf(tmpParam,"string(\"%s\")",curs->name); 462 sprintf(tmpParam1,"string(\"%s\")",filePath->value); 463 params[0]="attr"; 464 params[1]=tmpParam; 465 params[2]="value"; 466 params[3]=tmpParam1;//filePath->value; 467 params[4]=NULL; 468 fprintf(stderr, "## XSLT PARAMETERS ATTR: %s VALUE: %s \n", 469 tmpParam,tmpParam1); 470 xmlSubstituteEntitiesDefault(1); 471 fprintf(stderr,"%s %d \n",__FILE__,__LINE__); 472 fflush(stderr); 473 xmlLoadExtDtdDefaultValue = 0; 474 xsltStylesheetPtr cur = NULL; 475 xmlDocPtr doc, res; 476 cur = xsltParseStylesheetFile(BAD_CAST xslPath->value); 477 doc = xmlParseFile(xmlPath->value); 478 fflush(stderr); 479 res = xsltApplyStylesheet(cur, doc, params); 480 xmlChar *xmlbuff; 481 int buffersize; 482 xmlDocDumpFormatMemory(res, &xmlbuff, &buffersize, 1); 483 // Store the executeRequest in file again 484 free(tmpParam); 485 free(tmpParam1); 486 fprintf(stderr," # Request / XSLT: %s\n",xmlbuff); 487 fflush(stderr); 488 FILE* saveExecute=fopen(xmlPath->value,"wb"); 489 fwrite(xmlbuff,1,buffersize,saveExecute); 490 fflush(saveExecute); 491 fclose(saveExecute); 492 xmlFree(xmlbuff); 493 xmlFreeDoc(doc); 494 xsltFreeStylesheet(cur); 495 } 496 fprintf(stderr,"%s %d \n",__FILE__,__LINE__); 497 fflush(stderr); 498 curs=curs->next; 499 } 500 fprintf(stderr,"%s %d \n",__FILE__,__LINE__); 501 fflush(stderr); 502 xmlCleanupParser(); 503 fprintf(stderr,"%s %d \n",__FILE__,__LINE__); 504 fflush(stderr); 505 FILE* f0=fopen(xmlPath->value,"rb"); 506 if(f0!=NULL){ 507 long flen; 508 char *fcontent; 509 fseek (f0, 0, SEEK_END); 510 flen = ftell (f0); 511 fseek (f0, 0, SEEK_SET); 512 fcontent = (char *) malloc ((flen + 1) * sizeof (char)); 513 fread(fcontent,flen,1,f0); 514 fcontent[flen]=0; 515 fclose(f0); 516 fprintf(stderr,"%s %d \n",__FILE__,__LINE__); 517 fflush(stderr); 518 map *schema=getMapFromMaps(conf,"database","schema"); 519 map* sid=getMapFromMaps(conf,"lenv","usid"); 520 fprintf(stderr,"%s %d \n",__FILE__,__LINE__); 521 fflush(stderr); 522 char *req=(char*)malloc((flen+strlen(schema->value)+strlen(sid->value)+66)*sizeof(char)); 523 fprintf(stderr,"%s %d \n",__FILE__,__LINE__); 524 fflush(stderr); 525 sprintf(req,"UPDATE %s.services set request_execute_content=$$%s$$ WHERE uuid=$$%s$$",schema->value,fcontent,sid->value); 526 fprintf(stderr,"%s %d \n",__FILE__,__LINE__); 527 fflush(stderr); 528 execSql(conf,1,req); 529 fprintf(stderr,"%s %d \n",__FILE__,__LINE__); 530 fflush(stderr); 531 free(fcontent); 532 fprintf(stderr,"%s %d \n",__FILE__,__LINE__); 533 fflush(stderr); 534 free(req); 535 fprintf(stderr,"%s %d \n",__FILE__,__LINE__); 536 fflush(stderr); 537 } 538 } 539 343 540 // Uploading data input to cluster 344 541 maps* in=getMaps(conf,"uploadQueue"); … … 371 568 break; 372 569 } 570 373 571 case 3: { 374 572 // Generating job script … … 380 578 break; 381 579 } 580 382 581 case 4: { 383 582 // Submitting job to cluster … … 389 588 break; 390 589 } 590 391 591 case 5: { 392 592 // Downloading process outputs from cluster 393 593 //json_object* in=mapsToJson(outputs); 394 dumpMaps(outputs);395 594 maps* curs=outputs; 396 char *keys[8][2]={ 595 dumpMaps(curs); 596 char *keys[10][2]={ 397 597 { 398 598 "Reference", … … 400 600 }, 401 601 { 402 " storage",602 "generated_file", 403 603 "cachefile" 404 604 }, 405 605 { 406 " fmimeType",606 "mimeType", 407 607 "mimetype" 408 608 }, … … 412 612 }, 413 613 { 614 "geodatatype", 615 "datatype" 616 }, 617 { 618 "wms_extent", 619 "boundingbox" 620 }, 621 { 414 622 "ref_wms_link", 415 623 "ref_wms_link" … … 420 628 }, 421 629 { 630 "ref_wcs_link", 631 "ref_wcs_preview_link" 632 }, 633 { 422 634 "ref_wfs_link", 423 635 "ref_wfs_link" 424 },425 {426 "geodatatype",427 "datatype"428 636 } 637 }; 638 char* specifics[5][2]={ 639 { 640 "download_link", 641 "ref_download_link" 642 }, 643 { 644 "wms_link", 645 "ref_wms_link" 646 }, 647 { 648 "wfs_link", 649 "ref_wfs_link" 650 }, 651 { 652 "wcs_link", 653 "ref_wcs_link" 654 }, 655 { 656 "wcs_link", 657 "ref_wcs_preview_link" 658 } 429 659 }; 430 660 json_object *res1=json_object_new_object(); 431 661 while(curs!=NULL){ 432 map* tmpMap=getMap(curs->content,"cache_file");433 sid=getMap(curs->content,"ref_wms_link");434 662 json_object *res2=json_object_new_object(); 435 663 int i=0; 436 664 int hasRef=-1; 437 for(;i< 8;i++){665 for(;i<10;i++){ 438 666 sid=getMap(curs->content,keys[i][0]); 439 667 if(sid!=NULL){ … … 448 676 else{ 449 677 maps* curs0=curs->child; 450 while(curs0!=NULL){ 451 json_object *res3=json_object_new_object(); 452 int i0=0; 453 int hasRef0=-1; 454 for(;i0<8;i0++){ 455 sid=getMap(curs0->content,keys[i0][0]); 456 if(sid!=NULL){ 457 json_object *jsStr=json_object_new_string(sid->value); 458 json_object_object_add(res3,keys[i0][1],jsStr); 459 //if(i0==0) 460 hasRef0=1; 678 int i=0; 679 int bypass=-1; 680 for(i=0;i<5;i++){ 681 maps* specificMaps; 682 if((specificMaps=getMaps(curs0,specifics[i][0]))!=NULL){ 683 int hasRef0=-1; 684 int i0=0; 685 for(;i0<6;i0++){ 686 sid=getMap(specificMaps->content,keys[i0][0]); 687 if(sid!=NULL){ 688 json_object *jsStr=json_object_new_string(sid->value); 689 if(i0==0){ 690 json_object_object_add(res2,specifics[i][1],jsStr); 691 } 692 else 693 json_object_object_add(res2,keys[i0][1],jsStr); 694 hasRef0=1; 695 bypass=1; 696 if(i==1){ 697 struct stat buf; 698 char timeStr[ 100 ] = ""; 699 if (stat(sid->value, &buf)==0){ 700 strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime)); 701 json_object *jsStr=json_object_new_string(timeStr); 702 json_object_object_add(res2,"creation_date",jsStr); 703 } 704 } 705 } 706 } 707 } 708 } 709 if(bypass<0) 710 while(curs0!=NULL){ 711 json_object *res3=json_object_new_object(); 712 int i0=0; 713 int hasRef0=-1; 714 for(;i0<10;i0++){ 715 sid=getMap(curs0->content,keys[i0][0]); 716 if(sid!=NULL){ 717 json_object *jsStr=json_object_new_string(sid->value); 718 json_object_object_add(res3,keys[i0][1],jsStr); 719 //if(i0==0) 720 hasRef0=1; 721 } 461 722 } 723 if(hasRef0<0) 724 json_object_put(res3); 725 else 726 json_object_object_add(res2,curs0->name,res3); 727 curs0=curs0->next; 462 728 } 463 if(hasRef0<0)464 json_object_put(res3);465 else466 json_object_object_add(res2,curs0->name,res3);467 curs0=curs0->next;468 }469 729 json_object_object_add(res1,curs->name,res2); 470 730 } … … 474 734 break; 475 735 } 736 476 737 case 6: { 477 738 // Finalize HPC 478 sid=getMapFromMaps(conf,"lenv","local_script"); 479 if(sid!=NULL){ 480 json_object *jsStr=json_object_new_string(sid->value); 481 json_object_object_add(res,"inputs",jsStr); 482 } 739 char *keys[6][2]={ 740 { 741 "SubmitTime", 742 "hpc_submission_date" 743 }, 744 { 745 "JobId", 746 "hpc_job_identifier" 747 }, 748 { 749 "JobName", 750 "hpc_job_name" 751 }, 752 { 753 "StartTime", 754 "hpc_start_date" 755 }, 756 { 757 "EndTime", 758 "hpc_end_date" 759 }, 760 { 761 "JobState", 762 "hpc_status" 763 } 764 }; 765 int i=0; 766 if(getMaps(conf,"henv")!=NULL){ 767 for(i=0;i<6;i++){ 768 sid=getMapFromMaps(conf,"henv",keys[i][0]); 769 if(sid!=NULL){ 770 json_object *jsStr=json_object_new_string(sid->value); 771 json_object_object_add(res,keys[i][1],jsStr); 772 } 773 } 774 } 775 if((sid=getMapFromMaps(conf,"henv","billing_nb_cpu"))!=NULL){ 776 json_object *jsStr=json_object_new_string(sid->value); 777 json_object_object_add(res,"hpc_cpu_usage",jsStr); 778 }else{ 779 json_object *jsStr=json_object_new_string("1"); 780 json_object_object_add(res,"hpc_cpu_usage",jsStr); 781 } 782 json_object *jsStr=json_object_new_string("succeeded"); 783 json_object_object_add(res,"wps_status",jsStr); 483 784 break; 484 785 } 786 485 787 case 7: { 486 788 // Error or Dismiss … … 490 792 json_object_object_add(res,"message",jsStr); 491 793 } 492 json_object *jsStr=json_object_new_string("failed"); 794 json_object *jsStr; 795 if(state==1) 796 jsStr=json_object_new_string("dismissed"); 797 else 798 jsStr=json_object_new_string("failed"); 493 799 json_object_object_add(res,"wps_status",jsStr); 494 800 break; … … 498 804 } 499 805 } 500 501 local_params* argumentsA=(local_params*)malloc(MAPS_SIZE+MAP_SIZE+sizeof(json_object*)+(2*sizeof(int))); 502 argumentsA->conf=conf; 503 argumentsA->url=url; 504 argumentsA->res=res; 505 argumentsA->step=step; 506 argumentsA->state=state; 806 807 if(local_arguments==NULL) 808 local_arguments=(local_params**)malloc(sizeof(local_params*)); 809 else 810 local_arguments=(local_params**)realloc(local_arguments,(nbThreads+1)*sizeof(local_params*)); 811 local_arguments[nbThreads]=(local_params*)malloc(MAPS_SIZE+MAP_SIZE+sizeof(json_object*)+(2*sizeof(int))); 812 local_arguments[nbThreads]->conf=conf; 813 local_arguments[nbThreads]->url=url; 814 local_arguments[nbThreads]->res=res; 815 local_arguments[nbThreads]->step=step; 816 local_arguments[nbThreads]->state=state; 507 817 //pthread_t p1; 508 818 if(myThreads==NULL) … … 510 820 else 511 821 myThreads=(pthread_t*)realloc(myThreads,(nbThreads+1)*sizeof(pthread_t)); 512 if(pthread_create(&myThreads[nbThreads], NULL, _invokeCallback, (void*) argumentsA)==-1){822 if(pthread_create(&myThreads[nbThreads], NULL, _invokeCallback, (void*)local_arguments[nbThreads])==-1){ 513 823 setMapInMaps(conf,"lenv","message",_("Unable to create a new thread")); 514 824 return false; … … 519 829 } 520 830 831 /** 832 * Wait for the threads to end then, clean used memory. 833 */ 521 834 void cleanupCallbackThreads(){ 522 835 int i=0; 523 836 for(i=0;i<nbThreads;i++){ 837 fprintf(stderr,"%s %d %d \n",__FILE__,__LINE__,i); 838 fflush(stderr); 524 839 pthread_join(myThreads[i],NULL); 525 } 840 free(local_arguments[i]); 841 } 842 free(local_arguments); 526 843 free(myThreads); 527 844 }
Note: See TracChangeset
for help on using the changeset viewer.