Skip to content
Snippets Groups Projects
Commit af5ea3a6 authored by Sven Willner's avatar Sven Willner
Browse files

Send metadata as msgpack

parent b6f2b9d3
No related branches found
No related tags found
No related merge requests found
......@@ -3,6 +3,7 @@
#ifdef HAVE_BORGES
#include <assert.h>
#include <msgpack/msgpack.h>
#include <netdb.h>
#include <string.h>
#include <sys/socket.h>
......@@ -16,13 +17,10 @@
#include <borges/borges.h>
#define BORGES_DEFAULT_PORT "7979"
static int borges_ev_handler(borg_connection *conn, borg_event_type event_type, void *event_data, void *userp);
typedef struct borg_info_t
{
const char *expid;
int streamid;
bool shutdown;
borg_context borg_ctx;
......@@ -62,87 +60,155 @@ int borges_write_grib_message(stream_t *streamptr, const void *gribbuffer, size_
return borges_write_buf(streamptr->fileID, gribbuffer, nbytes);
}
static int parse(const char *path, const char **host, size_t *host_len, const char **port, size_t *port_len, const char **meta, size_t *meta_len)
int borges_connect_int(const char *path, char filemode, stream_t *streamptr)
{
borg_info_t *info = (borg_info_t *)Calloc(1, sizeof(borg_info_t));
// parse BORGES address format: borges://[<host>][:<port>]/[<meta>]
const char *host = NULL;
size_t host_len = 0;
const char *port = NULL;
size_t port_len = 0;
const char *p = path;
*host = p;
host = p;
for (; *p != '/' && *p != ':' && *p != '\0'; ++p)
{
if ((*p < 'A' || *p > 'Z') && (*p < 'a' || *p > 'z') && *p != '+' && *p != '.' && *p != '-')
return -1;
if ((*p < 'A' || *p > 'Z') && (*p < 'a' || *p > 'z') && (*p < '0' || *p > '9') && *p != '+' && *p != '.' && *p != '-' && *p != '_')
{
Error("Borges Error: Invalid host");
goto error1;
}
}
*host_len = (p - *host);
host_len = p - host;
if (*p == ':')
{
++p;
*port = p;
port = ++p;
for (; *p != '/' && *p != '\0'; ++p)
{
if (*p < '0' || *p > '9')
return -1;
{
Error("Borges Error: Invalid port");
goto error1;
}
}
*port_len = (p - *port);
port_len = p - port;
}
borg_msgpack_writer writer;
borg_msgpack_init_writer(info->meta, sizeof(info->meta), &writer);
borg_msgpack_write_start_map(&writer);
if (*p == '/')
{
*meta = ++p;
for (; *p != '\0'; ++p)
const char *key = ++p;
const char *value = NULL;
for (; *p != '/' && *p != '\0'; ++p)
{
switch (*p)
{
case '=':
if (key == NULL)
{
Error("Borges Error: '=' in value in metadata dict");
goto error1;
}
if (key == p)
{
Error("Borges Error: Empty key given");
goto error1;
}
if (borg_msgpack_write_stringn(key, p - key, &writer) != BORG_MSGPACK_SUCCESS)
{
Error("Borges Error: Metadata dict too long");
goto error1;
}
value = p + 1;
key = NULL;
break;
case ',':
if (value == NULL)
{
Error("Borges Error: No value given for key in metadata dict");
goto error1;
}
if (borg_msgpack_write_stringn(value, p - value, &writer) != BORG_MSGPACK_SUCCESS)
{
Error("Borges Error: Metadata dict too long");
goto error1;
}
key = p + 1;
value = NULL;
break;
default:
if ((*p < 'A' || *p > 'Z') && (*p < 'a' || *p > 'z') && (*p < '0' || *p > '9') && *p != '+' && *p != '.' && *p != '-' && *p != '_')
{
Error("Borges Error: Invalid character in metadata dict");
goto error1;
}
break;
}
}
if (value == NULL)
{
if ((*p < 'A' || *p > 'Z') && (*p < 'a' || *p > 'z') && *p != '+' && *p != '.' && *p != '-' && *p != '=')
return -1;
Error("Borges Error: No value given for key in metadata dict");
goto error1;
}
if (borg_msgpack_write_stringn(value, p - value, &writer) != BORG_MSGPACK_SUCCESS)
{
Error("Borges Error: Metadata dict too long");
goto error1;
}
*meta_len = (p - *meta);
}
return 0;
}
int borges_connect_int(const char *path, char filemode, stream_t *streamptr)
{
// parse BORGES address format: borges://<host>[:<port>]/[meta_info]
const char *host = NULL;
size_t host_len = 0;
const char *port = NULL;
size_t port_len = 0;
const char *meta = NULL;
size_t meta_len = 0;
if (parse(path, &host, &host_len, &port, &port_len, &meta, &meta_len) < 0)
if (borg_msgpack_write_end_map(&writer) != BORG_MSGPACK_SUCCESS)
{
Warning("Invalid path");
return -1;
Error("Borges Error: Metadata dict too long");
goto error1;
}
if (port == NULL)
if (borg_msgpack_release_write_buffer(&writer, NULL, &info->meta_len) != BORG_MSGPACK_SUCCESS)
{
port = BORGES_DEFAULT_PORT;
meta_len = sizeof(BORGES_DEFAULT_PORT);
Error("Borges Error: Metadata dict too long");
goto error1;
}
borg_info_t *info = (borg_info_t *)Calloc(1, sizeof(borg_info_t));
memcpy(info->meta, meta, meta_len);
info->meta_len = meta_len;
borg_err err;
if (borg_context_init(&info->borg_ctx, &err) < 0)
{
log_err(&err, "Failed to init borges context");
Free(info);
return -1;
log_err(&err, "Failed to initialize context");
goto error1;
}
if (port_len == 0)
{
static const char* BORGES_DEFAULT_PORT = "7979";
port = BORGES_DEFAULT_PORT;
port_len = strlen(BORGES_DEFAULT_PORT);
}
char borges_path[256];
snprintf(borges_path, sizeof(borges_path), "tcp://%.*s:%.*s", host_len, host, port_len, port);
if (borg_connection_init(&info->borg_conn, &info->borg_ctx, borges_path, borges_ev_handler, info, &err) < 0)
if (host_len == 0)
{
Warning("Path is %s", path);
log_err(&err, "Failed to init borges connection");
borg_context_cleanup(&info->borg_ctx);
Free(info);
return -1;
if (borg_maybe_spawn_and_connect(&info->borg_conn, &info->borg_ctx, NULL, borges_ev_handler, info, &err) < 0)
{
log_err(&err, "Failed to connect");
goto error2;
}
}
else
{
char borges_path[256];
snprintf(borges_path, sizeof(borges_path), "tcp://%.*s:%.*s", host_len, host, port_len, port);
if (borg_connection_init(&info->borg_conn, &info->borg_ctx, borges_path, borges_ev_handler, info, &err) < 0)
{
char err_msg[256];
borg_err_snprint(&err, err_msg, sizeof(err_msg));
Error("Borges Error: Failed to connect, %s (for %s)", err_msg, borges_path);
goto error2;
}
}
borg_context_run(&info->borg_ctx);
if (info->shutdown)
{
borg_connection_cleanup(&info->borg_conn);
borg_context_cleanup(&info->borg_ctx);
Free(info);
goto error3;
return -1;
}
streamptr->protocolData = info;
......@@ -155,8 +221,11 @@ int borges_connect_int(const char *path, char filemode, stream_t *streamptr)
}
}
error3:
borg_connection_cleanup(&info->borg_conn);
error2:
borg_context_cleanup(&info->borg_ctx);
error1:
Free(info);
return -1;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment