source: branches/PublicaMundi_David-devel/zoo-project/zoo-kernel/zoo_amqp.c @ 709

Last change on this file since 709 was 615, checked in by david, 10 years ago

add amqp queue management

File size: 4.1 KB
Line 
1/**
2 * Author : David Saggiorato
3 *
4 *  Copyright 2008-2009 GeoLabs SARL. All rights reserved.
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to deal
8 * in the Software without restriction, including without limitation the rights
9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 * copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 * THE SOFTWARE.
23 */
24
25
26
27#include <stdlib.h>
28#include <stdio.h>
29#include <string.h>
30
31#include <stdint.h>
32#include <amqp_tcp_socket.h>
33#include <amqp.h>
34#include <amqp_framing.h>
35
36
37amqp_connection_state_t conn;
38amqp_socket_t *socket = NULL;
39char * amqp_exchange;
40char * amqp_routingkey;
41char * amqp_hostname;
42int amqp_port;
43char *amqp_user;
44char *amqp_passwd;
45char * amqp_queue;
46
47
48void init_amqp(const char * hostname, int port,const char *user, const char *passwd,const char *exchange, const char * routingkey,const char * queue){
49    amqp_hostname = strdup(hostname);
50    amqp_user = strdup(user);
51    amqp_passwd = strdup(passwd);
52    amqp_exchange = strdup(exchange);
53    amqp_routingkey = strdup(routingkey);
54    amqp_queue = strdup(queue);
55    amqp_port = port;
56}
57
58void init_consumer(){
59    amqp_basic_consume(conn, 1, amqp_cstring_bytes(amqp_queue), amqp_empty_bytes, 0, 0, 0, amqp_empty_table);
60    amqp_get_rpc_reply(conn);
61}
62
63uint64_t consumer_amqp(char **msg){
64    amqp_rpc_reply_t res;
65    amqp_envelope_t envelope;
66    amqp_maybe_release_buffers(conn);
67    res = amqp_consume_message(conn, &envelope, NULL, 0);
68    if (AMQP_RESPONSE_NORMAL != res.reply_type) {
69        return 0;
70    }
71    unsigned char *buf = (unsigned char *) envelope.message.body.bytes;
72    size_t i;
73    char * tmp = (char *) malloc(envelope.message.body.len * sizeof(char*));
74    for (i = 0; i < envelope.message.body.len; i++) {
75        tmp[i] = (char)buf[i];
76    }
77    tmp[envelope.message.body.len]='\0';
78    *msg = tmp;
79    uint64_t r = envelope.delivery_tag;
80    amqp_destroy_envelope(&envelope);
81    return r;
82}
83
84
85int consumer_ack_amqp(uint64_t delivery_tag){
86    return amqp_basic_ack(conn,1,delivery_tag,0);
87}
88
89
90
91int bind_amqp(){
92    conn = amqp_new_connection();
93    socket = amqp_tcp_socket_new(conn);
94    int  status;
95    if (!socket)
96        return -1;
97    status = amqp_socket_open(socket, amqp_hostname, amqp_port);
98    if (status != 0)
99        return status;
100    amqp_rpc_reply_t s = amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, amqp_user, amqp_passwd);
101    if (s.reply_type != AMQP_RESPONSE_NORMAL)
102        return -2;
103    amqp_channel_open(conn, 1);
104    s = amqp_get_rpc_reply(conn);
105    if (s.reply_type != AMQP_RESPONSE_NORMAL)
106        return -3;
107    return 0;
108
109}
110
111int send_msg(const char * msg, const char * content_type){
112    amqp_basic_properties_t props;
113    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
114    props.content_type = amqp_cstring_bytes(content_type);
115    props.delivery_mode = 2; /* persistent delivery mode */
116    int ret = amqp_basic_publish(conn,1,amqp_cstring_bytes(amqp_exchange),amqp_cstring_bytes(amqp_routingkey), 0, 0, &props, amqp_cstring_bytes(msg));
117    if (ret < 0)
118        fprintf(stderr, " %s\n", amqp_error_string2(ret));
119    return ret;
120}
121
122int close_amqp(){
123    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
124    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
125    amqp_destroy_connection(conn);
126    return 0;
127}
128
Note: See TracBrowser for help on using the repository browser.

Search

Context Navigation

ZOO Sponsors

http://www.zoo-project.org/trac/chrome/site/img/geolabs-logo.pnghttp://www.zoo-project.org/trac/chrome/site/img/neogeo-logo.png http://www.zoo-project.org/trac/chrome/site/img/apptech-logo.png http://www.zoo-project.org/trac/chrome/site/img/3liz-logo.png http://www.zoo-project.org/trac/chrome/site/img/gateway-logo.png

Become a sponsor !

Knowledge partners

http://www.zoo-project.org/trac/chrome/site/img/ocu-logo.png http://www.zoo-project.org/trac/chrome/site/img/gucas-logo.png http://www.zoo-project.org/trac/chrome/site/img/polimi-logo.png http://www.zoo-project.org/trac/chrome/site/img/fem-logo.png http://www.zoo-project.org/trac/chrome/site/img/supsi-logo.png http://www.zoo-project.org/trac/chrome/site/img/cumtb-logo.png

Become a knowledge partner

Related links

http://zoo-project.org/img/ogclogo.png http://zoo-project.org/img/osgeologo.png