From b56aa9832ecf1193d3d364556a7a5bde58508daf Mon Sep 17 00:00:00 2001 From: Jay Sorg Date: Sun, 5 Jul 2015 23:14:46 -0700 Subject: [PATCH] work on main loop changes --- common/parse.h | 4 ++ common/trans.c | 135 +++++++++++++++++++++++++++++++++------- common/trans.h | 18 ++++++ libxrdp/libxrdpinc.h | 2 + libxrdp/xrdp_fastpath.c | 2 +- libxrdp/xrdp_iso.c | 4 +- xrdp/xrdp_process.c | 6 +- 7 files changed, 142 insertions(+), 29 deletions(-) diff --git a/common/parse.h b/common/parse.h index 2ae3927b..b7f93bfe 100644 --- a/common/parse.h +++ b/common/parse.h @@ -40,13 +40,17 @@ struct stream char *end; char *data; int size; + int pad0; /* offsets of various headers */ char *iso_hdr; char *mcs_hdr; char *sec_hdr; char *rdp_hdr; char *channel_hdr; + /* other */ char *next_packet; + struct stream *next; + int *source; }; /******************************************************************************/ diff --git a/common/trans.c b/common/trans.c index 3828a174..89679844 100644 --- a/common/trans.c +++ b/common/trans.c @@ -24,6 +24,8 @@ #include "parse.h" #include "ssl_calls.h" +#define MAX_SBYTES 0 + /*****************************************************************************/ int APP_CC trans_tls_recv(struct trans *self, void *ptr, int len) @@ -171,11 +173,27 @@ int APP_CC trans_get_wait_objs_rw(struct trans *self, tbus *robjs, int *rcount, tbus *wobjs, int *wcount) { - if (trans_get_wait_objs(self, robjs, rcount) != 0) + if (self == 0) { return 1; } + if (self->status != TRANS_STATUS_UP) + { + return 1; + } + + if ((self->si != 0) && (self->si->source[self->my_source] > MAX_SBYTES)) + { + } + else + { + if (trans_get_wait_objs(self, robjs, rcount) != 0) + { + return 1; + } + } + if (self->wait_s != 0) { wobjs[*wcount] = self->sck; @@ -187,7 +205,7 @@ trans_get_wait_objs_rw(struct trans *self, tbus *robjs, int *rcount, /*****************************************************************************/ int APP_CC -send_waiting(struct trans *self, int block) +trans_send_waiting(struct trans *self, int block) { struct stream *temp_s; int bytes; @@ -209,9 +227,13 @@ send_waiting(struct trans *self, int block) if (sent > 0) { temp_s->p += sent; + if (temp_s->source != 0) + { + temp_s->source[0] -= sent; + } if (temp_s->p >= temp_s->end) { - self->wait_s = (struct stream *) (temp_s->next_packet); + self->wait_s = temp_s->next; free_stream(temp_s); } } @@ -247,6 +269,7 @@ trans_check_wait_objs(struct trans *self) int to_read = 0; int read_so_far = 0; int rv = 0; + int cur_source; if (self == 0) { @@ -310,8 +333,17 @@ trans_check_wait_objs(struct trans *self) } else /* connected server or client (2 or 3) */ { - if (self->trans_can_recv(self, self->sck, 0)) + if (self->si != 0 && self->si->source[self->my_source] > MAX_SBYTES) { + } + else if (g_tcp_can_recv(self->sck, 0)) + { + cur_source = 0; + if (self->si != 0) + { + cur_source = self->si->cur_source; + self->si->cur_source = self->my_source; + } read_so_far = (int) (self->in_s->end - self->in_s->data); to_read = self->header_size - read_so_far; @@ -329,6 +361,10 @@ trans_check_wait_objs(struct trans *self) { /* error */ self->status = TRANS_STATUS_DOWN; + if (self->si != 0) + { + self->si->cur_source = cur_source; + } return 1; } } @@ -336,6 +372,10 @@ trans_check_wait_objs(struct trans *self) { /* error */ self->status = TRANS_STATUS_DOWN; + if (self->si != 0) + { + self->si->cur_source = cur_source; + } return 1; } else @@ -357,8 +397,12 @@ trans_check_wait_objs(struct trans *self) } } } + if (self->si != 0) + { + self->si->cur_source = cur_source; + } } - if (send_waiting(self, 0) != 0) + if (trans_send_waiting(self, 0) != 0) { /* error */ self->status = TRANS_STATUS_DOWN; @@ -453,7 +497,7 @@ trans_force_write_s(struct trans *self, struct stream *out_s) size = (int) (out_s->end - out_s->data); total = 0; - if (send_waiting(self, 1) != 0) + if (trans_send_waiting(self, 1) != 0) { self->status = TRANS_STATUS_DOWN; return 1; @@ -512,23 +556,68 @@ trans_force_write(struct trans *self) /*****************************************************************************/ int APP_CC -trans_write_copy(struct trans *self) +trans_write_copy_s(struct trans *self, struct stream *out_s) { int size; - struct stream *out_s; + int sent; struct stream *wait_s; struct stream *temp_s; + char *out_data; if (self->status != TRANS_STATUS_UP) { return 1; } - - out_s = self->out_s; + /* try to send any left over */ + if (trans_send_waiting(self, 0) != 0) + { + /* error */ + self->status = TRANS_STATUS_DOWN; + return 1; + } + out_data = out_s->data; + sent = 0; size = (int) (out_s->end - out_s->data); - make_stream(wait_s); + if (self->wait_s == 0) + { + /* if no left over, try to send this new data */ + if (g_tcp_can_send(self->sck, 0)) + { + sent = self->trans_send(self, out_s->data, size); + if (sent > 0) + { + out_data += sent; + size -= sent; + } + else if (sent == 0) + { + return 1; + } + else + { + if (!g_tcp_last_error_would_block(self->sck)) + { + return 1; + } + } + } + } + if (size < 1) + { + return 0; + } + /* did not send right away, have to copy */ + make_stream(wait_s); init_stream(wait_s, size); - out_uint8a(wait_s, out_s->data, size); + if (self->si != 0) + { + if (self->si->cur_source != 0) + { + self->si->source[self->si->cur_source] += size; + wait_s->source = self->si->source + self->si->cur_source; + } + } + out_uint8a(wait_s, out_data, size); s_mark_end(wait_s); wait_s->p = wait_s->data; if (self->wait_s == 0) @@ -538,24 +627,22 @@ trans_write_copy(struct trans *self) else { temp_s = self->wait_s; - while (temp_s->next_packet != 0) + while (temp_s->next != 0) { - temp_s = (struct stream *) (temp_s->next_packet); + temp_s = temp_s->next; } - temp_s->next_packet = (char *) wait_s; + temp_s->next = wait_s; } - - /* try to send */ - if (send_waiting(self, 0) != 0) - { - /* error */ - self->status = TRANS_STATUS_DOWN; - return 1; - } - return 0; } +/*****************************************************************************/ +int APP_CC +trans_write_copy(struct trans* self) +{ + return trans_write_copy_s(self, self->out_s); +} + /*****************************************************************************/ int APP_CC trans_connect(struct trans *self, const char *server, const char *port, diff --git a/common/trans.h b/common/trans.h index c2a10762..34816eed 100644 --- a/common/trans.h +++ b/common/trans.h @@ -45,6 +45,20 @@ typedef int (APP_CC *trans_recv_proc) (struct trans *self, void *ptr, int len); typedef int (APP_CC *trans_send_proc) (struct trans *self, const void *data, int len); typedef int (APP_CC *trans_can_recv_proc) (struct trans *self, int sck, int millis); +/* optional source info */ + +#define XRDP_SOURCE_NONE 0 +#define XRDP_SOURCE_CLIENT 1 +#define XRDP_SOURCE_SESMAN 2 +#define XRDP_SOURCE_CHANSRV 3 +#define XRDP_SOURCE_MOD 4 + +struct source_info +{ + int cur_source; + int source[7]; +}; + struct trans { tbus sck; /* socket handle */ @@ -68,6 +82,8 @@ struct trans trans_recv_proc trans_recv; trans_send_proc trans_send; trans_can_recv_proc trans_can_recv; + struct source_info *si; + int my_source; }; struct trans* APP_CC @@ -93,6 +109,8 @@ trans_force_write(struct trans* self); int APP_CC trans_write_copy(struct trans* self); int APP_CC +trans_write_copy_s(struct trans* self, struct stream* out_s); +int APP_CC trans_connect(struct trans* self, const char* server, const char* port, int timeout); int APP_CC diff --git a/libxrdp/libxrdpinc.h b/libxrdp/libxrdpinc.h index 8d99814c..e3e79564 100644 --- a/libxrdp/libxrdpinc.h +++ b/libxrdp/libxrdpinc.h @@ -72,6 +72,8 @@ struct xrdp_session int up_and_running; int (*is_term)(void); int in_process_data; /* inc / dec libxrdp_process_data calls */ + + struct source_info si; }; struct xrdp_session * DEFAULT_CC diff --git a/libxrdp/xrdp_fastpath.c b/libxrdp/xrdp_fastpath.c index 5bf63b29..6a0cdfc0 100644 --- a/libxrdp/xrdp_fastpath.c +++ b/libxrdp/xrdp_fastpath.c @@ -137,7 +137,7 @@ xrdp_fastpath_session_callback(struct xrdp_fastpath *self, int msg, int APP_CC xrdp_fastpath_send(struct xrdp_fastpath *self, struct stream *s) { - if (trans_force_write_s(self->trans, s) != 0) + if (trans_write_copy_s(self->trans, s) != 0) { return 1; } diff --git a/libxrdp/xrdp_iso.c b/libxrdp/xrdp_iso.c index 1a2fb08a..8369226c 100644 --- a/libxrdp/xrdp_iso.c +++ b/libxrdp/xrdp_iso.c @@ -286,7 +286,7 @@ xrdp_iso_send_cc(struct xrdp_iso *self) len_ptr[1] = len; len_indicator_ptr[0] = len_indicator; - if (trans_force_write_s(self->trans, s) != 0) + if (trans_write_copy_s(self->trans, s) != 0) { free_stream(s); return 1; @@ -409,7 +409,7 @@ xrdp_iso_send(struct xrdp_iso *self, struct stream *s) out_uint8(s, ISO_PDU_DT); out_uint8(s, 0x80); - if (trans_force_write_s(self->trans, s) != 0) + if (trans_write_copy_s(self->trans, s) != 0) { return 1; } diff --git a/xrdp/xrdp_process.c b/xrdp/xrdp_process.c index 5a7cd1d8..4f869ab2 100644 --- a/xrdp/xrdp_process.c +++ b/xrdp/xrdp_process.c @@ -195,6 +195,8 @@ xrdp_process_main_loop(struct xrdp_process *self) self->server_trans->callback_data = self; init_stream(self->server_trans->in_s, 8192 * 4); self->session = libxrdp_init((tbus)self, self->server_trans); + self->server_trans->si = &(self->session->si); + self->server_trans->my_source = XRDP_SOURCE_CLIENT; /* this callback function is in xrdp_wm.c */ self->session->callback = callback; /* this function is just above */ @@ -217,8 +219,8 @@ xrdp_process_main_loop(struct xrdp_process *self) robjs[robjs_count++] = self->self_term_event; xrdp_wm_get_wait_objs(self->wm, robjs, &robjs_count, wobjs, &wobjs_count, &timeout); - trans_get_wait_objs(self->server_trans, robjs, &robjs_count); - + trans_get_wait_objs_rw(self->server_trans, robjs, &robjs_count, + wobjs, &wobjs_count); /* wait */ if (g_obj_wait(robjs, robjs_count, wobjs, wobjs_count, timeout) != 0) {