- Timestamp:
- Mar 19, 2015, 10:01:11 AM (9 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/PublicaMundi_David-devel/zoo-project/zoo-kernel/zoo_loader.c
r553 r617 67 67 68 68 #include "service_zcfg.h" 69 #include "zoo_json.h" 70 #include "zoo_amqp.h" 71 #include "zoo_sql.h" 69 72 //#include "service_internal.h" 73 74 75 void 76 loadServiceAndRun (maps ** myMap, service * s1, map * request_inputs, 77 maps ** inputs, maps ** ioutputs, int *eres,FCGX_Stream *out, FCGX_Stream *err); 70 78 71 79 xmlXPathObjectPtr extractFromDoc (xmlDocPtr, const char *); … … 100 108 int pid = getpid(); 101 109 struct cgi_env *cgi; 102 //PrintEnv(request.err, "Request environment", request.envp);103 110 cgi = (struct cgi_env*)malloc(sizeof(struct cgi_env)); 104 111 cgiMain_init (NULL, NULL,&cgi,request); … … 455 462 if (strQuery != NULL) 456 463 free (strQuery); 457 464 /* 465 json_object *obj; 466 maptojson(&obj,tmpMap); 467 fprintf(stderr,"%s\n",json_object_to_json_string(obj)); 468 fflush(stderr); 469 */ 458 470 runRequest (&tmpMap,&cgi,request); 459 471 … … 525 537 return 1; 526 538 } 527 539 /* 540 json_object *jobj; 541 mapstojson(&jobj,conf); 542 fprintf (stderr,"The json object created: %s\n",json_object_to_json_string(jobj)); 543 freeMaps(&conf); 544 545 maps *conf_tmp; 546 jsontomaps(jobj,&conf_tmp); 547 dumpMaps(conf_tmp); 548 return 1; 549 */ 528 550 char *rootDir; 529 551 map *m_rootDir = getMapFromMaps (conf, "server", "rootDir"); … … 684 706 } 685 707 } 708 709 710 char * amqp_host; 711 map * m_amqp_host = getMapFromMaps (conf, "rabbitmq", "host"); 712 if (m_amqp_host == NULL){ 713 fprintf(stderr,"Configuration error: [rabbitmq] host"); 714 return 2; 715 } 716 else { 717 amqp_host = (char *)malloc((strlen(m_amqp_host->value) +1)*sizeof(char*)); 718 strncpy(amqp_host,m_amqp_host->value,strlen(m_amqp_host->value)); 719 amqp_host[strlen(m_amqp_host->value)] = '\0'; 720 } 721 722 int amqp_port; 723 map *m_amqp_port = getMapFromMaps (conf, "rabbitmq", "port"); 724 if (m_amqp_port == NULL){ 725 fprintf(stderr,"Configuration error: [rabbitmq] port"); 726 return 2; 727 } 728 else { 729 amqp_port = atoi(m_amqp_port->value); 730 if (amqp_port == 0){ 731 fprintf(stderr,"Configuration error: [rabbitmq] port"); 732 return 2; 733 } 734 } 735 736 char * amqp_user; 737 map * m_amqp_user = getMapFromMaps (conf, "rabbitmq", "user"); 738 if (m_amqp_user == NULL){ 739 fprintf(stderr,"Configuration error: [rabbitmq] user"); 740 return 2; 741 } 742 else { 743 amqp_user = (char *)malloc((strlen(m_amqp_user->value) +1)*sizeof(char*)); 744 strncpy(amqp_user,m_amqp_user->value,strlen(m_amqp_user->value)); 745 amqp_user[strlen(m_amqp_user->value)] = '\0'; 746 } 747 748 char * amqp_passwd; 749 map * m_amqp_passwd = getMapFromMaps (conf, "rabbitmq", "passwd"); 750 if (m_amqp_passwd == NULL){ 751 fprintf(stderr,"Configuration error: [rabbitmq] passwd"); 752 return 2; 753 } 754 else { 755 amqp_passwd = (char *)malloc((strlen(m_amqp_passwd->value) +1)*sizeof(char*)); 756 strncpy(amqp_passwd,m_amqp_passwd->value,strlen(m_amqp_passwd->value)); 757 amqp_passwd[strlen(m_amqp_passwd->value)] = '\0'; 758 } 759 760 char * amqp_exchange; 761 map * m_amqp_exchange = getMapFromMaps (conf, "rabbitmq", "exchange"); 762 if (m_amqp_exchange == NULL){ 763 fprintf(stderr,"Configuration error: [rabbitmq] exchange"); 764 return 2; 765 } 766 else { 767 amqp_exchange = (char *)malloc((strlen(m_amqp_exchange->value) +1)*sizeof(char*)); 768 strncpy(amqp_exchange,m_amqp_exchange->value,strlen(m_amqp_exchange->value)); 769 amqp_exchange[strlen(m_amqp_exchange->value)] = '\0'; 770 } 771 772 char * amqp_routingkey; 773 map * m_amqp_routingkey = getMapFromMaps (conf, "rabbitmq", "routingkey"); 774 if (m_amqp_routingkey == NULL){ 775 fprintf(stderr,"Configuration error: [amqp] routingkey"); 776 return 2; 777 } 778 else { 779 amqp_routingkey = (char *)malloc((strlen(m_amqp_routingkey->value) +1)*sizeof(char*)); 780 strncpy(amqp_routingkey,m_amqp_routingkey->value,strlen(m_amqp_routingkey->value)); 781 amqp_routingkey[strlen(m_amqp_routingkey->value)] = '\0'; 782 } 783 784 char * amqp_queue; 785 map * m_amqp_queue = getMapFromMaps (conf, "rabbitmq", "queue"); 786 if (m_amqp_queue == NULL){ 787 fprintf(stderr,"Configuration error: [rabbitmq] queue"); 788 return 2; 789 } 790 else { 791 amqp_queue = (char *)malloc((strlen(m_amqp_queue->value) +1)*sizeof(char*)); 792 strncpy(amqp_queue,m_amqp_queue->value,strlen(m_amqp_queue->value)); 793 amqp_queue[strlen(m_amqp_queue->value)] = '\0'; 794 } 795 796 char * status_user; 797 map * m_status_user = getMapFromMaps (conf, "status", "user"); 798 if (m_status_user == NULL){ 799 fprintf(stderr,"Configuration error: [status] user"); 800 return 2; 801 } 802 else { 803 status_user = (char *)malloc((strlen(m_status_user->value) +1)*sizeof(char*)); 804 strncpy(status_user,m_status_user->value,strlen(m_status_user->value)); 805 status_user[strlen(m_status_user->value)] = '\0'; 806 } 807 808 809 char * status_passwd; 810 map * m_status_passwd = getMapFromMaps (conf, "status", "passwd"); 811 if (m_status_passwd == NULL){ 812 fprintf(stderr,"Configuration error: [status] passwd"); 813 return 2; 814 } 815 else { 816 status_passwd = (char *)malloc((strlen(m_status_passwd->value) +1)*sizeof(char*)); 817 strncpy(status_passwd,m_status_passwd->value,strlen(m_status_passwd->value)); 818 status_passwd[strlen(m_status_passwd->value)] = '\0'; 819 } 820 821 char * status_bdd; 822 map * m_status_bdd = getMapFromMaps (conf, "status", "bdd"); 823 if (m_status_bdd == NULL){ 824 fprintf(stderr,"Configuration error: [status] bdd"); 825 return 2; 826 } 827 else { 828 status_bdd = (char *)malloc((strlen(m_status_bdd->value) +1)*sizeof(char*)); 829 strncpy(status_bdd,m_status_bdd->value,strlen(m_status_bdd->value)); 830 status_bdd[strlen(m_status_bdd->value)] = '\0'; 831 } 832 833 char * status_host; 834 map * m_status_host = getMapFromMaps (conf, "status", "host"); 835 if (m_status_host == NULL){ 836 fprintf(stderr,"Configuration error: [status] host"); 837 return 2; 838 } 839 else { 840 status_host = (char *)malloc((strlen(m_status_host->value) +1)*sizeof(char*)); 841 strncpy(status_host,m_status_host->value,strlen(m_status_host->value)); 842 status_host[strlen(m_status_host->value)] = '\0'; 843 } 844 845 int status_port; 846 map *m_status_port = getMapFromMaps (conf, "status", "port"); 847 if (m_status_port == NULL){ 848 fprintf(stderr,"Configuration error: [status] port"); 849 return 2; 850 } 851 else { 852 status_port = atoi(m_status_port->value); 853 if (status_port == 0){ 854 fprintf(stderr,"Configuration error: [status] port"); 855 return 2; 856 } 857 } 858 init_sql(status_host,status_user,status_passwd,status_bdd,status_port); 686 859 687 860 int sock = FCGX_OpenSocket(listen, listen_queue); … … 705 878 return 3; 706 879 } 707 880 881 init_amqp(amqp_host,amqp_port,amqp_user, amqp_passwd, amqp_exchange, amqp_routingkey,amqp_queue); 882 883 708 884 int fork_status = fork(); 709 885 if (fork_status == 0){ 710 886 //child 711 int forker_pid = getpid(); 887 int master_sync= getpid(); 888 fprintf(stderr,"Master sync%d\n",getpid()); 712 889 FCGX_Init(); 713 890 FCGX_Request request; … … 718 895 fork_status = fork(); 719 896 if (fork_status == 0){ 720 fprintf(stderr,"child %d \n",i);897 fprintf(stderr,"child sync %d \n",getpid()); 721 898 fflush(stderr); 722 899 break; … … 724 901 } 725 902 while(1){ 726 if (forker_pid != getpid()){ 903 /* mode synchrone */ 904 if (master_sync != getpid()){ 727 905 while(FCGX_Accept_r(&request) == 0){ 728 906 process(&request); … … 737 915 else { 738 916 wait(0); 739 fprintf(stderr,"new child\n"); 917 fprintf(stderr,"Master sync %d\n",getpid()); 918 fprintf(stderr,"New sync Child\n"); 740 919 fflush(stderr); 741 920 fork(); … … 744 923 } 745 924 else { 746 747 while(1); 925 int master_async = getpid(); 926 fprintf(stderr,"Master async %d\n",master_async); 927 int fork_s; 928 int j; 929 for (j = 0; j< async_worker; j++){ 930 fork_s = fork(); 931 if (fork_s == 0){ 932 fprintf(stderr,"child async %d \n",getpid()); 933 fflush(stderr); 934 break; 935 } 936 } 937 json_object *msg_obj; 938 json_object *maps_obj; 939 maps * map_c; 940 json_object *req_format_jobj; 941 maps * request_input_real_format; 942 json_object *req_jobj; 943 map * request_inputs; 944 json_object *outputs_jobj; 945 maps * request_output_real_format; 946 947 char *msg; 948 int c; 949 int eres; 950 char * service_identifier; 951 service * s1 = NULL; 952 while(1){ 953 /* mode asynchrone */ 954 if( master_async != getpid()){ 955 /*traitement des requetes de la queue */ 956 bind_amqp(); 957 init_consumer(); 958 while(1){ 959 960 c = consumer_amqp(&msg); 961 if (c == 0) 962 break; 963 msg_obj = json_tokener_parse(msg); 964 965 free(msg); 966 maps_obj = json_object_object_get(msg_obj,"maps"); 967 968 map_c = jsontomaps(maps_obj); 969 970 req_format_jobj = json_object_object_get(msg_obj,"request_input_real_format"); 971 request_input_real_format = jsontomaps(req_format_jobj); 972 973 req_jobj = json_object_object_get(msg_obj,"request_inputs"); 974 request_inputs = jsontomap(req_jobj); 975 976 outputs_jobj = json_object_object_get(msg_obj,"request_output_real_format"); 977 request_output_real_format = jsontomaps(outputs_jobj); 978 979 json_object_put(msg_obj); 980 981 /* traitemement du message */ 982 /* Recherche des references */ 983 maps* tmp=request_input_real_format; 984 HINTERNET hInternet = InternetOpen ((LPCTSTR) "ZooWPSClient\0",INTERNET_OPEN_TYPE_PRECONFIG, NULL, NULL, 0); 985 while(tmp!=NULL){ 986 map * tmp_map = getMap(tmp->content,"xlink:href"); 987 if (tmp_map != NULL){ 988 if (loadRemoteFile(&map_c, &tmp_map, &hInternet,tmp_map->value) < 0) { 989 /* passer le status failed dans la base de donnée */ 990 fprintf(stderr,"Erreur de chargement \n"); 991 } 992 } 993 tmp=tmp->next; 994 } 995 runHttpRequests (&map_c, &request_input_real_format, &hInternet); 996 InternetCloseHandle (&hInternet); 997 free(tmp); 998 map * uuid = getMapFromMaps(map_c,"lenv","usid"); 999 if (uuid != NULL) 1000 start_job(uuid->value); 1001 map *t=createMap("background","1"); 1002 maps * lenv = getMaps(map_c,"lenv"); 1003 addMapToMap(&lenv->content,t); 1004 freeMap(&t); 1005 free(t); 1006 1007 map * m_identifier = getMap (request_inputs, "Identifier"); 1008 1009 service_identifier = zStrdup (m_identifier->value); 1010 1011 s1 = search_service (service_identifier); 1012 free(service_identifier); 1013 1014 1015 //dumpMaps(request_input_real_format); 1016 1017 loadServiceAndRun(&map_c, s1,request_inputs,&request_input_real_format, &request_output_real_format, &eres,NULL,NULL); 1018 if (eres == SERVICE_SUCCEEDED) { 1019 outputResponse (s1,request_input_real_format,request_output_real_format,request_inputs, 0, map_c, eres,NULL,NULL); 1020 } 1021 1022 1023 //dumpMaps(request_output_real_format); 1024 //fprintf(stderr,"################################################################\n"); 1025 //dumpMaps(map_c); 1026 1027 outputResponse (s1,request_input_real_format,request_output_real_format,request_inputs, 0, map_c, eres,NULL,NULL); 1028 1029 1030 freeMaps(&map_c); 1031 map_c= NULL; 1032 1033 freeMaps(&request_input_real_format); 1034 request_input_real_format = NULL; 1035 1036 //dumpMap(request_inputs); 1037 freeMap(&request_inputs); 1038 request_inputs = NULL; 1039 1040 //dumpMaps(request_output_real_format); 1041 freeMaps(&request_output_real_format); 1042 request_output_real_format = NULL; 1043 consumer_ack_amqp(c); 1044 1045 1046 } 1047 close_amqp(); 1048 1049 1050 1051 1052 1053 } 1054 else { 1055 wait(0); 1056 fprintf(stderr,"Master async %d\n",getpid()); 1057 fprintf(stderr,"New async Child\n"); 1058 fflush(stderr); 1059 fork(); 1060 } 1061 } 748 1062 749 1063 }
Note: See TracChangeset
for help on using the changeset viewer.