Skip to content

Commit 41c2229

Browse files
committed
fix grpc disconnect, SW_AGENT_MAX_BUFFER_SIZE
1 parent 64212d8 commit 41c2229

File tree

9 files changed

+44
-54
lines changed

9 files changed

+44
-54
lines changed

docs/EnvVars.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ Environment Variable | Description | Default
1111
| `SW_AGENT_AUTHENTICATION` | The authentication token to verify that the agent is trusted by the backend OAP, as for how to configure the backend, refer to [the yaml](https://github.com/apache/skywalking/blob/4f0f39ffccdc9b41049903cc540b8904f7c9728e/oap-server/server-bootstrap/src/main/resources/application.yml#L155-L158). | unset |
1212
| `SW_AGENT_LOGGING_LEVEL` | The logging level, could be one of `CRITICAL`, `FATAL`, `ERROR`, `WARN`(`WARNING`), `INFO`, `DEBUG` | `INFO` |
1313
| `SW_AGENT_DISABLE_PLUGINS` | The name patterns in CSV pattern, plugins whose name matches one of the pattern won't be installed | `''` |
14+
| `SW_AGENT_MAX_BUFFER_SIZE` | The maximum queue backlog size for sending the segment data to backend, segments beyond this are silently dropped | `'1000'` |
1415
| `SW_SQL_PARAMETERS_LENGTH` | The maximum length of the collected parameter, parameters longer than the specified length will be truncated, length 0 turns off parameter tracing | `0` |
1516
| `SW_PYMONGO_TRACE_PARAMETERS` | Indicates whether to collect the filters of pymongo | `False` |
1617
| `SW_PYMONGO_PARAMETERS_MAX_LENGTH` | The maximum length of the collected filters, filters longer than the specified length will be truncated | `512` |

skywalking/agent/__init__.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,24 +38,30 @@
3838

3939
def __heartbeat():
4040
while not __finished.is_set():
41-
if connected():
41+
try:
4242
__protocol.heartbeat()
43+
except Exception as exc:
44+
logger.error(str(exc))
4345

44-
__finished.wait(30 if connected() else 3)
46+
__finished.wait(30)
4547

4648

4749
def __report():
4850
while not __finished.is_set():
49-
if connected():
51+
try:
5052
__protocol.report(__queue) # is blocking actually, blocks for max config.QUEUE_TIMEOUT seconds
53+
except Exception as exc:
54+
logger.error(str(exc))
5155

52-
__finished.wait(1)
56+
__finished.wait(0)
5357

5458

5559
def __query_profile_command():
5660
while not __finished.is_set():
57-
if connected():
61+
try:
5862
__protocol.query_profile_commands()
63+
except Exception as exc:
64+
logger.error(str(exc))
5965

6066
__finished.wait(profile_task_query_interval)
6167

@@ -68,7 +74,7 @@ def __command_dispatch():
6874
def __init_threading():
6975
global __heartbeat_thread, __report_thread, __query_profile_thread, __command_dispatch_thread, __queue, __finished
7076

71-
__queue = Queue(maxsize=10000)
77+
__queue = Queue(maxsize=config.max_buffer_size)
7278
__finished = Event()
7379
__heartbeat_thread = Thread(name='HeartbeatThread', target=__heartbeat, daemon=True)
7480
__report_thread = Thread(name='ReportThread', target=__report, daemon=True)
@@ -162,10 +168,6 @@ def started():
162168
return __started
163169

164170

165-
def connected():
166-
return __protocol.connected()
167-
168-
169171
def archive(segment: 'Segment'):
170172
try: # unlike checking __queue.full() then inserting, this is atomic
171173
__queue.put(segment, block=False)

skywalking/agent/protocol/__init__.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,6 @@ def fork_after_in_parent(self):
2929
def fork_after_in_child(self):
3030
pass
3131

32-
def connected(self):
33-
return False
34-
3532
def heartbeat(self):
3633
raise NotImplementedError()
3734

skywalking/agent/protocol/grpc.py

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import logging
1919
import traceback
20-
from queue import Queue, Empty, Full
20+
from queue import Queue, Empty
2121
from time import time
2222

2323
import grpc
@@ -35,15 +35,13 @@
3535

3636
class GrpcProtocol(Protocol):
3737
def __init__(self):
38+
self.properties_sent = False
3839
self.state = None
3940

4041
if config.force_tls:
41-
self.channel = grpc.secure_channel(config.collector_address, grpc.ssl_channel_credentials(),
42-
options=(('grpc.max_connection_age_grace_ms',
43-
1000 * config.GRPC_TIMEOUT),))
42+
self.channel = grpc.secure_channel(config.collector_address, grpc.ssl_channel_credentials())
4443
else:
45-
self.channel = grpc.insecure_channel(config.collector_address, options=(('grpc.max_connection_age_grace_ms',
46-
1000 * config.GRPC_TIMEOUT),))
44+
self.channel = grpc.insecure_channel(config.collector_address)
4745

4846
if config.authentication:
4947
self.channel = grpc.intercept_channel(
@@ -58,44 +56,42 @@ def __init__(self):
5856
def _cb(self, state):
5957
logger.debug('grpc channel connectivity changed, [%s -> %s]', self.state, state)
6058
self.state = state
61-
if self.connected():
62-
try:
63-
self.service_management.send_instance_props()
64-
except grpc.RpcError:
65-
self.on_error()
6659

6760
def query_profile_commands(self):
6861
logger.debug("query profile commands")
6962
self.profile_query.do_query()
7063

7164
def heartbeat(self):
7265
try:
66+
if not self.properties_sent:
67+
self.service_management.send_instance_props()
68+
self.properties_sent = True
69+
7370
self.service_management.send_heart_beat()
71+
7472
except grpc.RpcError:
7573
self.on_error()
7674

77-
def connected(self):
78-
return self.state == grpc.ChannelConnectivity.READY
79-
8075
def on_error(self):
8176
traceback.print_exc() if logger.isEnabledFor(logging.DEBUG) else None
8277
self.channel.unsubscribe(self._cb)
8378
self.channel.subscribe(self._cb, try_to_connect=True)
8479

8580
def report(self, queue: Queue, block: bool = True):
8681
start = time()
87-
segment = None
8882

8983
def generator():
90-
nonlocal segment
91-
9284
while True:
9385
try:
94-
timeout = max(0, config.QUEUE_TIMEOUT - int(time() - start)) # type: int
86+
timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int
87+
if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously
88+
return
9589
segment = queue.get(block=block, timeout=timeout) # type: Segment
9690
except Empty:
9791
return
9892

93+
queue.task_done()
94+
9995
logger.debug('reporting segment %s', segment)
10096

10197
s = SegmentObject(
@@ -137,16 +133,7 @@ def generator():
137133

138134
yield s
139135

140-
queue.task_done()
141-
142136
try:
143137
self.traces_reporter.report(generator())
144-
145138
except grpc.RpcError:
146139
self.on_error()
147-
148-
if segment:
149-
try:
150-
queue.put(segment, block=False)
151-
except Full:
152-
pass

skywalking/agent/protocol/http.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@ def fork_after_in_child(self):
3535
self.service_management.fork_after_in_child()
3636
self.traces_reporter.fork_after_in_child()
3737

38-
def connected(self):
39-
return True
40-
4138
def heartbeat(self):
4239
if not self.properties_sent:
4340
self.service_management.send_instance_props()
@@ -50,17 +47,19 @@ def report(self, queue: Queue, block: bool = True):
5047
def generator():
5148
while True:
5249
try:
53-
timeout = max(0, config.QUEUE_TIMEOUT - int(time() - start)) # type: int
50+
timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int
51+
if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously
52+
return
5453
segment = queue.get(block=block, timeout=timeout) # type: Segment
5554
except Empty:
5655
return
5756

57+
queue.task_done()
58+
5859
logger.debug('reporting segment %s', segment)
5960

6061
yield segment
6162

62-
queue.task_done()
63-
6463
try:
6564
self.traces_reporter.report(generator=generator())
6665
except Exception:

skywalking/agent/protocol/kafka.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import logging
1919
from skywalking.loggings import logger, getLogger
2020
from queue import Queue, Empty
21+
from time import time
2122

2223
from skywalking import config
2324
from skywalking.agent import Protocol
@@ -43,13 +44,20 @@ def heartbeat(self):
4344
self.service_management.send_heart_beat()
4445

4546
def report(self, queue: Queue, block: bool = True):
47+
start = time()
48+
4649
def generator():
4750
while True:
4851
try:
52+
timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int
53+
if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously
54+
return
4955
segment = queue.get(block=block) # type: Segment
5056
except Empty:
5157
return
5258

59+
queue.task_done()
60+
5361
logger.debug('reporting segment %s', segment)
5462

5563
s = SegmentObject(
@@ -91,6 +99,4 @@ def generator():
9199

92100
yield s
93101

94-
queue.task_done()
95-
96102
self.traces_reporter.report(generator())

skywalking/client/grpc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def __init__(self, channel: grpc.Channel):
6060
self.report_stub = TraceSegmentReportServiceStub(channel)
6161

6262
def report(self, generator):
63-
self.report_stub.collect(generator, timeout=config.GRPC_TIMEOUT)
63+
self.report_stub.collect(generator)
6464

6565

6666
class GrpcProfileTaskChannelService(ProfileTaskChannelService):

skywalking/config.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,7 @@
2222
if TYPE_CHECKING:
2323
from typing import List
2424

25-
# In order to prevent timeouts and possible segment loss make sure QUEUE_TIMEOUT is always at least few seconds lower
26-
# than GRPC_TIMEOUT.
27-
GRPC_TIMEOUT = 300 # type: int
28-
QUEUE_TIMEOUT = 240 # type: int
25+
QUEUE_TIMEOUT = 1 # type: int
2926

3027
RE_IGNORE_PATH = re.compile('^$') # type: re.Pattern
3128

@@ -41,6 +38,7 @@
4138
authentication = os.getenv('SW_AGENT_AUTHENTICATION') # type: str
4239
logging_level = os.getenv('SW_AGENT_LOGGING_LEVEL') or 'INFO' # type: str
4340
disable_plugins = (os.getenv('SW_AGENT_DISABLE_PLUGINS') or '').split(',') # type: List[str]
41+
max_buffer_size = int(os.getenv('SW_AGENT_MAX_BUFFER_SIZE', '1000')) # type: int
4442
sql_parameters_length = int(os.getenv('SW_SQL_PARAMETERS_LENGTH') or '0') # type: int
4543
pymongo_trace_parameters = True if os.getenv('SW_PYMONGO_TRACE_PARAMETERS') and \
4644
os.getenv('SW_PYMONGO_TRACE_PARAMETERS') == 'True' else False # type: bool

skywalking/trace/context.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,4 +269,4 @@ def get_context() -> SpanContext:
269269
if spans:
270270
return spans[len(spans) - 1].context
271271

272-
return SpanContext() if agent.connected() else NoopContext()
272+
return SpanContext()

0 commit comments

Comments
 (0)