forked from DataDog/dd-agent
-
Notifications
You must be signed in to change notification settings - Fork 0
/
emitter.py
141 lines (117 loc) · 4.56 KB
/
emitter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# (C) Datadog, Inc. 2010-2016
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
# stdlib
from hashlib import md5
import logging
import re
import zlib
import unicodedata
# 3p
import requests
import simplejson as json
# project
from config import get_version
from utils.proxy import set_no_proxy_settings
set_no_proxy_settings()
# urllib3 logs a bunch of stuff at the info level
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.WARN)
requests_log.propagate = True
# From http://stackoverflow.com/questions/92438/stripping-non-printable-characters-from-a-string-in-python
control_chars = ''.join(map(unichr, range(0, 32) + range(127, 160)))
control_char_re = re.compile('[%s]' % re.escape(control_chars))
def remove_control_chars(s, log):
if isinstance(s, str):
sanitized = control_char_re.sub('', s)
elif isinstance(s, unicode):
sanitized = ''.join(['' if unicodedata.category(c) in ['Cc','Cf'] else c
for c in u'{}'.format(s)])
if sanitized != s:
log.warning('Removed control chars from string: ' + s)
return sanitized
def remove_undecodable_chars(s, log):
sanitized = s
if isinstance(s, str):
try:
s.decode('utf8')
except UnicodeDecodeError:
sanitized = s.decode('utf8', errors='ignore')
log.warning(u'Removed undecodable chars from string: ' + s.decode('utf8', errors='replace'))
return sanitized
def sanitize_payload(item, log, sanitize_func):
if isinstance(item, dict):
newdict = {}
for k, v in item.iteritems():
newval = sanitize_payload(v, log, sanitize_func)
newkey = sanitize_func(k, log)
newdict[newkey] = newval
return newdict
if isinstance(item, list):
newlist = []
for listitem in item:
newlist.append(sanitize_payload(listitem, log, sanitize_func))
return newlist
if isinstance(item, tuple):
newlist = []
for listitem in item:
newlist.append(sanitize_payload(listitem, log, sanitize_func))
return tuple(newlist)
if isinstance(item, basestring):
return sanitize_func(item, log)
return item
def http_emitter(message, log, agentConfig, endpoint):
"Send payload"
url = agentConfig['dd_url']
log.debug('http_emitter: attempting postback to ' + url)
# Post back the data
try:
try:
payload = json.dumps(message)
except UnicodeDecodeError:
newmessage = sanitize_payload(message, log, remove_control_chars)
try:
payload = json.dumps(newmessage)
except UnicodeDecodeError:
log.info('Removing undecodable characters from payload')
newmessage = sanitize_payload(newmessage, log, remove_undecodable_chars)
payload = json.dumps(newmessage)
except UnicodeDecodeError as ude:
log.error('http_emitter: Unable to convert message to json %s', ude)
# early return as we can't actually process the message
return
except RuntimeError as rte:
log.error('http_emitter: runtime error dumping message to json %s', rte)
# early return as we can't actually process the message
return
except Exception as e:
log.error('http_emitter: unknown exception processing message %s', e)
return
zipped = zlib.compress(payload)
log.debug("payload_size=%d, compressed_size=%d, compression_ratio=%.3f"
% (len(payload), len(zipped), float(len(payload))/float(len(zipped))))
apiKey = message.get('apiKey', None)
if not apiKey:
raise Exception("The http emitter requires an api key")
url = "{0}/intake/{1}?api_key={2}".format(url, endpoint, apiKey)
try:
headers = post_headers(agentConfig, zipped)
r = requests.post(url, data=zipped, timeout=5, headers=headers)
r.raise_for_status()
if r.status_code >= 200 and r.status_code < 205:
log.debug("Payload accepted")
except Exception:
log.exception("Unable to post payload.")
try:
log.error("Received status code: {0}".format(r.status_code))
except Exception:
pass
def post_headers(agentConfig, payload):
return {
'User-Agent': 'Datadog Agent/%s' % agentConfig['version'],
'Content-Type': 'application/json',
'Content-Encoding': 'deflate',
'Accept': 'text/html, */*',
'Content-MD5': md5(payload).hexdigest(),
'DD-Collector-Version': get_version()
}