Commit df658b02 authored by vvaltman's avatar vvaltman

proxy conn

parent 1b916c40
...@@ -493,14 +493,16 @@ int cpu_server_close_connection (connection_job_t C, int who) /* {{{ */ { ...@@ -493,14 +493,16 @@ int cpu_server_close_connection (connection_job_t C, int who) /* {{{ */ {
assert (c->io_conn); assert (c->io_conn);
job_signal (JOB_REF_PASS (c->io_conn), JS_ABORT); job_signal (JOB_REF_PASS (c->io_conn), JS_ABORT);
if (c->target) { if (c->basic_type == ct_outbound) {
MODULE_STAT->outbound_connections --; MODULE_STAT->outbound_connections --;
if (connection_is_active (c->flags)) { if (connection_is_active (c->flags)) {
MODULE_STAT->active_outbound_connections --; MODULE_STAT->active_outbound_connections --;
} }
job_signal (JOB_REF_PASS (c->target), JS_RUN); if (c->target) {
job_signal (JOB_REF_PASS (c->target), JS_RUN);
}
} else { } else {
MODULE_STAT->inbound_connections --; MODULE_STAT->inbound_connections --;
...@@ -544,7 +546,9 @@ int do_connection_job (job_t job, int op, struct job_thread *JT) /* {{{ */ { ...@@ -544,7 +546,9 @@ int do_connection_job (job_t job, int op, struct job_thread *JT) /* {{{ */ {
__sync_fetch_and_and (&c->flags, ~C_READY_PENDING); __sync_fetch_and_and (&c->flags, ~C_READY_PENDING);
MODULE_STAT->active_outbound_connections ++; MODULE_STAT->active_outbound_connections ++;
MODULE_STAT->active_connections ++; MODULE_STAT->active_connections ++;
__sync_fetch_and_add (&CONN_TARGET_INFO(c->target)->active_outbound_connections, 1); if (c->target) {
__sync_fetch_and_add (&CONN_TARGET_INFO(c->target)->active_outbound_connections, 1);
}
if (c->status == conn_connecting) { if (c->status == conn_connecting) {
if (!__sync_bool_compare_and_swap (&c->status, conn_connecting, conn_working)) { if (!__sync_bool_compare_and_swap (&c->status, conn_connecting, conn_working)) {
assert (c->status == conn_error); assert (c->status == conn_error);
...@@ -587,7 +591,7 @@ int do_connection_job (job_t job, int op, struct job_thread *JT) /* {{{ */ { ...@@ -587,7 +591,7 @@ int do_connection_job (job_t job, int op, struct job_thread *JT) /* {{{ */ {
updates stats updates stats
creates socket_connection creates socket_connection
*/ */
connection_job_t alloc_new_connection (int cfd, conn_target_job_t CTJ, listening_connection_job_t LCJ, unsigned peer, unsigned char peer_ipv6[16], int peer_port) /* {{{ */ { connection_job_t alloc_new_connection (int cfd, conn_target_job_t CTJ, listening_connection_job_t LCJ, int basic_type, conn_type_t *conn_type, void *conn_extra, unsigned peer, unsigned char peer_ipv6[16], int peer_port) /* {{{ */ {
if (cfd < 0) { if (cfd < 0) {
return NULL; return NULL;
} }
...@@ -648,12 +652,12 @@ connection_job_t alloc_new_connection (int cfd, conn_target_job_t CTJ, listening ...@@ -648,12 +652,12 @@ connection_job_t alloc_new_connection (int cfd, conn_target_job_t CTJ, listening
assert (0); assert (0);
} }
c->type = CT ? CT->type : LC->type; c->type = conn_type;
c->extra = CT ? CT->extra : LC->extra; c->extra = conn_extra;
assert (c->type); assert (c->type);
c->basic_type = CT ? ct_outbound : ct_inbound; c->basic_type = basic_type;
c->status = CT ? conn_connecting : conn_working; c->status = (basic_type == ct_outbound) ? conn_connecting : conn_working;
c->flags |= c->type->flags & C_EXTERNAL; c->flags |= c->type->flags & C_EXTERNAL;
if (LC) { if (LC) {
...@@ -692,41 +696,58 @@ connection_job_t alloc_new_connection (int cfd, conn_target_job_t CTJ, listening ...@@ -692,41 +696,58 @@ connection_job_t alloc_new_connection (int cfd, conn_target_job_t CTJ, listening
c->out_queue = alloc_mp_queue_w (); c->out_queue = alloc_mp_queue_w ();
//c->out_packet_queue = alloc_mp_queue_w (); //c->out_packet_queue = alloc_mp_queue_w ();
if (CT) { if (basic_type == ct_outbound) {
vkprintf (1, "New connection %s:%d -> %s:%d\n", show_our_ip (C), c->our_port, show_remote_ip (C), c->remote_port); vkprintf (1, "New connection %s:%d -> %s:%d\n", show_our_ip (C), c->our_port, show_remote_ip (C), c->remote_port);
} else { } else {
vkprintf (1, "New connection %s:%d -> %s:%d\n", show_remote_ip (C), c->remote_port, show_our_ip (C), c->our_port); vkprintf (1, "New connection %s:%d -> %s:%d\n", show_remote_ip (C), c->remote_port, show_our_ip (C), c->our_port);
} }
int (*func)(connection_job_t) = CT ? CT->type->init_outbound : LC->type->init_accepted; int (*func)(connection_job_t) = (basic_type == ct_outbound) ? c->type->init_outbound : c->type->init_accepted;
vkprintf (3, "func = %p\n", func); vkprintf (3, "func = %p\n", func);
if (func (C) >= 0) { if (func (C) >= 0) {
if (CT) { if (basic_type == ct_outbound) {
job_incref (CTJ);
MODULE_STAT->outbound_connections ++; MODULE_STAT->outbound_connections ++;
MODULE_STAT->allocated_outbound_connections ++; MODULE_STAT->allocated_outbound_connections ++;
MODULE_STAT->outbound_connections_created ++; MODULE_STAT->outbound_connections_created ++;
CT->outbound_connections ++; if (CTJ) {
job_incref (CTJ);
CT->outbound_connections ++;
}
} else { } else {
MODULE_STAT->inbound_connections_accepted ++; MODULE_STAT->inbound_connections_accepted ++;
MODULE_STAT->allocated_inbound_connections ++; MODULE_STAT->allocated_inbound_connections ++;
MODULE_STAT->inbound_connections ++; MODULE_STAT->inbound_connections ++;
MODULE_STAT->active_inbound_connections ++; MODULE_STAT->active_inbound_connections ++;
MODULE_STAT->active_connections ++; MODULE_STAT->active_connections ++;
if (LCJ) {
c->listening = LC->fd;
c->listening_generation = LC->generation;
if (LC->flags & C_NOQACK) {
c->flags |= C_NOQACK;
}
c->listening = LC->fd; c->window_clamp = LC->window_clamp;
c->listening_generation = LC->generation;
if (LC->flags & C_NOQACK) { if (LC->flags & C_SPECIAL) {
c->flags |= C_NOQACK; c->flags |= C_SPECIAL;
__sync_fetch_and_add (&active_special_connections, 1);
if (active_special_connections > max_special_connections) {
vkprintf (active_special_connections >= max_special_connections + 16 ? 0 : 1, "ERROR: forced to accept connection when special connections limit was reached (%d of %d)\n", active_special_connections, max_special_connections);
}
if (active_special_connections >= max_special_connections) {
vkprintf (2, "**Invoking epoll_remove(%d)\n", LC->fd);
epoll_remove (LC->fd);
}
}
} }
c->window_clamp = LC->window_clamp;
if (c->window_clamp) { if (c->window_clamp) {
if (setsockopt (cfd, IPPROTO_TCP, TCP_WINDOW_CLAMP, &c->window_clamp, 4) < 0) { if (setsockopt (cfd, IPPROTO_TCP, TCP_WINDOW_CLAMP, &c->window_clamp, 4) < 0) {
vkprintf (0, "error while setting window size for socket %d to %d: %m\n", cfd, c->window_clamp); vkprintf (0, "error while setting window size for socket %d to %d: %m\n", cfd, c->window_clamp);
...@@ -739,18 +760,6 @@ connection_job_t alloc_new_connection (int cfd, conn_target_job_t CTJ, listening ...@@ -739,18 +760,6 @@ connection_job_t alloc_new_connection (int cfd, conn_target_job_t CTJ, listening
} }
} }
if (LC->flags & C_SPECIAL) {
c->flags |= C_SPECIAL;
__sync_fetch_and_add (&active_special_connections, 1);
if (active_special_connections > max_special_connections) {
vkprintf (active_special_connections >= max_special_connections + 16 ? 0 : 1, "ERROR: forced to accept connection when special connections limit was reached (%d of %d)\n", active_special_connections, max_special_connections);
}
if (active_special_connections >= max_special_connections) {
vkprintf (2, "**Invoking epoll_remove(%d)\n", LC->fd);
epoll_remove (LC->fd);
}
}
} }
alloc_new_socket_connection (C); alloc_new_socket_connection (C);
...@@ -1279,10 +1288,10 @@ int net_accept_new_connections (listening_connection_job_t LCJ) /* {{{ */ { ...@@ -1279,10 +1288,10 @@ int net_accept_new_connections (listening_connection_job_t LCJ) /* {{{ */ {
connection_job_t C; connection_job_t C;
if (peer.a4.sin_family == AF_INET) { if (peer.a4.sin_family == AF_INET) {
C = alloc_new_connection (cfd, NULL, LCJ, C = alloc_new_connection (cfd, NULL, LCJ, ct_inbound, LC->type, LC->extra,
ntohl (peer.a4.sin_addr.s_addr), NULL, ntohs (peer.a4.sin_port)); ntohl (peer.a4.sin_addr.s_addr), NULL, ntohs (peer.a4.sin_port));
} else { } else {
C = alloc_new_connection (cfd, NULL, LCJ, C = alloc_new_connection (cfd, NULL, LCJ, ct_inbound, LC->type, LC->extra,
0, peer.a6.sin6_addr.s6_addr, ntohs (peer.a6.sin6_port)); 0, peer.a6.sin6_addr.s6_addr, ntohs (peer.a6.sin6_port));
} }
if (C) { if (C) {
...@@ -1726,7 +1735,7 @@ int create_new_connections (conn_target_job_t CTJ) /* {{{ */ { ...@@ -1726,7 +1735,7 @@ int create_new_connections (conn_target_job_t CTJ) /* {{{ */ {
break; break;
} }
connection_job_t C = alloc_new_connection (cfd, CTJ, NULL, connection_job_t C = alloc_new_connection (cfd, CTJ, NULL, ct_outbound, CT->type, CT->extra,
ntohl (CT->target.s_addr), CT->target_ipv6, CT->port); ntohl (CT->target.s_addr), CT->target_ipv6, CT->port);
if (C) { if (C) {
......
...@@ -195,6 +195,24 @@ struct conn_target_info { ...@@ -195,6 +195,24 @@ struct conn_target_info {
int global_refcnt; int global_refcnt;
}; };
struct pseudo_conn_target_info {
struct event_timer timer;
int pad1;
int pad2;
void *pad3;
conn_type_t *type;
void *extra;
struct in_addr target;
unsigned char target_ipv6[16];
int port;
int active_outbound_connections, outbound_connections;
int ready_outbound_connections;
connection_job_t in_conn;
connection_job_t out_conn;
};
struct connection_info { struct connection_info {
struct event_timer timer; struct event_timer timer;
int fd; int fd;
...@@ -429,4 +447,4 @@ extern unsigned nat_info[MAX_NAT_INFO_RULES][2]; ...@@ -429,4 +447,4 @@ extern unsigned nat_info[MAX_NAT_INFO_RULES][2];
int net_add_nat_info (char *str); int net_add_nat_info (char *str);
unsigned nat_translate_ip (unsigned local_ip); unsigned nat_translate_ip (unsigned local_ip);
connection_job_t alloc_new_connection (int cfd, conn_target_job_t SS, connection_job_t LL, unsigned peer, unsigned char peer_ipv6[16], int peer_port); connection_job_t alloc_new_connection (int cfd, conn_target_job_t CTJ, listening_connection_job_t LCJ, int basic_type, conn_type_t *conn_type, void *conn_extra, unsigned peer, unsigned char peer_ipv6[16], int peer_port);
...@@ -85,6 +85,52 @@ conn_type_t ct_tcp_rpc_ext_server = { ...@@ -85,6 +85,52 @@ conn_type_t ct_tcp_rpc_ext_server = {
.crypto_needed_output_bytes = cpu_tcp_aes_crypto_ctr128_needed_output_bytes, .crypto_needed_output_bytes = cpu_tcp_aes_crypto_ctr128_needed_output_bytes,
}; };
int tcp_proxy_pass_parse_execute (connection_job_t C);
int tcp_proxy_pass_close (connection_job_t C, int who);
int tcp_proxy_pass_write_packet (connection_job_t c, struct raw_message *raw);
conn_type_t ct_proxy_pass = {
.magic = CONN_FUNC_MAGIC,
.flags = C_RAWMSG,
.title = "proxypass",
.init_accepted = server_failed,
.parse_execute = tcp_proxy_pass_parse_execute,
.close = tcp_proxy_pass_close,
.write_packet = tcp_proxy_pass_write_packet,
.connected = server_noop,
};
int tcp_proxy_pass_parse_execute (connection_job_t C) {
struct connection_info *c = CONN_INFO(C);
if (!c->extra) {
fail_connection (C, -1);
return 0;
}
job_t E = job_incref (c->extra);
struct connection_info *e = CONN_INFO(E);
struct raw_message *r = malloc (sizeof (*r));
rwm_move (r, &c->in);
mpq_push_w (e->out_queue, PTR_MOVE(r), 0);
job_signal (JOB_REF_PASS (E), JS_RUN);
return 0;
}
int tcp_proxy_pass_close (connection_job_t C, int who) {
struct connection_info *c = CONN_INFO(C);
if (c->extra) {
job_t E = PTR_MOVE (c->extra);
fail_connection (E, -23);
job_decref (JOB_REF_PASS (E));
}
return cpu_server_close_connection (C, who);
}
int tcp_proxy_pass_write_packet (connection_job_t C, struct raw_message *raw) {
rwm_union (&CONN_INFO(C)->out, raw);
return 0;
}
int tcp_rpcs_default_execute (connection_job_t c, int op, struct raw_message *msg); int tcp_rpcs_default_execute (connection_job_t c, int op, struct raw_message *msg);
static unsigned char ext_secret[16][16]; static unsigned char ext_secret[16][16];
...@@ -794,20 +840,46 @@ static int is_allowed_timestamp (int timestamp) { ...@@ -794,20 +840,46 @@ static int is_allowed_timestamp (int timestamp) {
return 0; return 0;
} }
static void proxy_connection (connection_job_t C, const struct domain_info *info) { static int proxy_connection (connection_job_t C, const struct domain_info *info) {
const char zero[16] = {}; const char zero[16] = {};
if (info->target.s_addr == 0 && !memcmp (info->target_ipv6, zero, 16)) { if (info->target.s_addr == 0 && !memcmp (info->target_ipv6, zero, 16)) {
vkprintf (0, "failed to proxy request to %s\n", info->domain); vkprintf (0, "failed to proxy request to %s\n", info->domain);
return; fail_connection (C, -17);
return 0;
}
int cfd = -1;
if (info->target.s_addr) {
cfd = client_socket (info->target.s_addr, 443, 0);
} else {
cfd = client_socket_ipv6 (info->target_ipv6, 443, 0);
}
if (cfd < 0) {
fail_connection (C, -27);
return 0;
} }
// TODO proxy the connection to info->target.s_addr / info->target_ipv6 struct connection_info *c = CONN_INFO(C);
c->type->crypto_free (C);
job_incref (C);
job_t EJ = alloc_new_connection (cfd, NULL, NULL, ct_outbound, &ct_proxy_pass, C, ntohl (*(int *)&info->target.s_addr), (void *)info->target_ipv6, 443);
if (!EJ) {
job_decref_f (C);
fail_connection (C, -37);
return 0;
}
c->type = &ct_proxy_pass;
c->extra = PTR_MOVE(EJ);
return c->type->parse_execute (C);
} }
int tcp_rpcs_compact_parse_execute (connection_job_t C) { int tcp_rpcs_compact_parse_execute (connection_job_t C) {
#define RETURN_TLS_ERROR(info) \ #define RETURN_TLS_ERROR(info) \
proxy_connection (C, info); \ return proxy_connection (C, info);
return (-1 << 28);
struct tcp_rpc_data *D = TCP_RPC_DATA (C); struct tcp_rpc_data *D = TCP_RPC_DATA (C);
if (D->crypto_flags & RPCF_COMPACT_OFF) { if (D->crypto_flags & RPCF_COMPACT_OFF) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment