+ "code": "\n# encoding = utf-8\n\nimport datetime\nimport hashlib\nimport json\nimport os\nimport requests\nimport ssl\nimport sys\nimport time\n\n'''\n IMPORTANT\n Edit only the validate_input and collect_events functions.\n Do not edit any other part in this file.\n This file is generated only once when creating the modular input.\n'''\n\nCHECKPOINT_KEY_SUFFIX="since"\n\n\ndef validate_input(helper, definition):\n """Implement your own validation logic to validate the input stanza configurations"""\n # This example accesses the modular input variable\n # sync_type = definition.parameters.get('sync_type', None)\n # search_filter = definition.parameters.get('search_filter', None)\n # services = definition.parameters.get('services', None)\n pass\n\ndef collect_events(helper, ew):\n # (log_level can be "debug", "info", "warning", "error" or "critical", case insensitive)\n #helper.set_log_level("debug")\n\n use_proxy = False\n if len(helper.get_proxy()) > 0:\n use_proxy = True\n\n # XXX: Special code to use system SSL trust store\n _patch_rest_helper(helper, use_proxy)\n\n stanza = helper.get_input_stanza_names()\n if isinstance(stanza, list):\n stanza = stanza[0]\n checkpoint_key = f"{stanza}_{CHECKPOINT_KEY_SUFFIX}"\n\n # Get options\n opt_sync_type = helper.get_arg('sync_type')\n opt_search_filter = helper.get_arg('search_filter')\n if opt_search_filter == None:\n opt_search_filter = ""\n opt_services = helper.get_arg('services')\n if opt_services == None:\n opt_services = ""\n opt_page_size = helper.get_arg('batch_size')\n if opt_page_size == None or opt_page_size == "":\n opt_page_size = "10000"\n\n # Get account credentials\n # NOTE: When testing inside the add-on builder UI, only username/password are\n # supported. Add-on builder will overwrite the api_endpoint/api_key configuration\n # in the source files, so make sure you manually correct that after saving!!!\n global_account = helper.get_arg('global_account')\n if 'api_endpoint' in global_account:\n api_endpoint = global_account['api_endpoint']\n elif 'username' in global_account:\n # Work around when testing inside add-on builder\n # add-on builder doesn't support custom global account parameters\n api_endpoint = global_account['username']\n if 'api_key' in global_account:\n api_key = global_account['api_key']\n elif 'password' in global_account:\n # Work around when testing inside add-on builder\n # add-on builder doesn't support custom global account parameters\n api_key = global_account['password']\n\n try:\n opt_since = int(float(helper.get_check_point(checkpoint_key)))\n except (ValueError, TypeError):\n opt_since = 0\n\n if opt_since == 0:\n # try to find the old checkpoint value from the previous plugin version:\n splunk_home = os.environ.get('SPLUNK_HOME')\n old_chkpt_key = hashlib.sha256(stanza.encode("utf-8")).hexdigest()\n old_chkpt_file = os.path.join(splunk_home or "", 'var', 'lib', 'splunk', 'modinputs', 'assets', old_chkpt_key)\n helper.log_debug(f"Could not find checkpoint in kvstore, so attempting to read from file: {old_chkpt_file}")\n if os.path.isfile(old_chkpt_file):\n try:\n with open(old_chkpt_file, "r") as f:\n old_chkpt_val = json.loads(f.read())\n if old_chkpt_val and old_chkpt_val['data'] and old_chkpt_val['data']['since']:\n opt_since = int(float(old_chkpt_val['data']['since']))\n except Exception as e:\n # ignore errors, we did our best\n helper.log_debug(f"Could not find old checkpoint: {e}")\n\n headers = {"Authorization": f"Bearer {api_key}"}\n checkpoint_ts = opt_since\n\n # Page through API\n start_key = ""\n cnt = 0\n helper.log_debug(f"Fetching assets {opt_sync_type} since {checkpoint_ts}")\n while True:\n url = f"https://{api_endpoint}/api/v1.0/export/org/assets/sync/{opt_sync_type}/assets.json?search={opt_search_filter}&since={opt_since}&services={opt_services}&start_key={start_key}&page_size={opt_page_size}"\n response = helper.send_http_request(url, "GET", parameters=None, payload=None,\n headers=headers, cookies=None, verify=True, cert=None,\n timeout=(10.0, 300), use_proxy=use_proxy)\n # check the response status, if the status is not sucessful, raise requests.HTTPError\n response.raise_for_status()\n\n r_json = response.json()\n if r_json == None or r_json['assets'] == None:\n helper.log_error("assets array missing")\n raise("assets array missing")\n assets = r_json['assets']\n\n if len(assets) == 0:\n break\n\n for asset in assets:\n event_ts = 0\n try:\n if opt_sync_type == 'created':\n event_ts = float(asset['created_at'])\n else:\n event_ts = float(asset['updated_at'])\n if event_ts > checkpoint_ts:\n checkpoint_ts = event_ts\n except (ValueError, TypeError):\n event_ts = time.time()\n helper.log_debug(f"Could not determine {opt_sync_type} time for asset.id={asset['id']}. Using current time ({event_ts}) for _splunk_event_ts instead")\n\n # TODO: Splunk isn't respecting the time sent through the time event. But why?\n # We're adding extra timestamps to the JSON asset and using TIMESTAMP_FIELDS=splunk_event_ts\n # in props.conf to work around.\n asset['_splunk_event_ts'] = event_ts\n asset['_splunk_ingest_ts'] = time.time()\n\n # Write the event to Splunk index\n event = helper.new_event(source=helper.get_input_type(), index=helper.get_output_index(), sourcetype=helper.get_sourcetype(), data=json.dumps(asset), time=event_ts, done=True, unbroken=True)\n ew.write_event(event)\n cnt += 1\n\n # Older versions of runZero don't support pagination\n # If the response isn't paged, finish now to avoid an infinite loop\n if "next_key" not in r_json:\n helper.log_debug("Batch fetching was not supported by the console, so ingested all records in one request.")\n break\n\n if r_json["next_key"] == "":\n break\n\n start_key = r_json["next_key"]\n\n if cnt > 0:\n helper.log_info(f"Successfully imported {cnt} assets {opt_sync_type} since {opt_since}.")\n else:\n helper.log_info(f"No assets {opt_sync_type} since {opt_since} to import.")\n\n # Save checkpoint so we'll only refresh newly created/updated assets on next iteration\n if checkpoint_ts > opt_since:\n helper.log_debug(f"Saving {checkpoint_ts} checkpoint to {checkpoint_key}")\n helper.save_check_point(checkpoint_key, int(checkpoint_ts))\n\n\nclass CustomHTTPAdapter(requests.adapters.HTTPAdapter):\n def init_poolmanager(self, *args, **kwargs):\n ca_files = []\n if sys.platform.startswith('linux'):\n ca_files = [\n '/etc/pki/tls/certs/ca-bundle.crt', # RedHat\n '/etc/pki/tls/cert.pem', # Fedora <= 34, RHEL <= 9, CentOS <= 9\n '/etc/ssl/certs/ca-certificates.crt', # Debian\n '/etc/ssl/cert.pem', # Alpine, Arch, Fedora 34+, OpenWRT, RHEL 9+, BSD\n '/etc/ssl/ca-bundle.pem', # SUSE\n ]\n elif sys.platform.startswith('darwin'):\n ca_files = ['/usr/local/etc/openssl/cert.pem']\n\n ssl_ctx = ssl.create_default_context()\n for ca_file in ca_files:\n if os.path.isfile(ca_file):\n ssl_ctx.load_verify_locations(ca_file)\n\n super().init_poolmanager(*args, **kwargs, ssl_context=ssl_ctx)\n\ndef _patch_rest_helper(helper, use_proxy):\n # Monkey-patch the underlying requests session provided by\n # the AOB library (typically in splunk_aoblib/rest_helper.py)\n # to use the system trust store.\n if not (helper and helper.rest_helper):\n return\n\n http_session = requests.Session()\n http_session.mount(\n 'http://', requests.adapters.HTTPAdapter(max_retries=3))\n http_session.mount(\n 'https://', CustomHTTPAdapter(max_retries=3))\n helper.rest_helper.http_session = http_session\n\n # Now set the requests_proxy variable, since this is normally\n # set at the same time as the http_session.\n if use_proxy:\n proxy_uri = None\n proxy = helper.get_proxy()\n if proxy and proxy.get('proxy_url') and proxy.get('proxy_type'):\n proxy_uri = proxy['proxy_url']\n if proxy.get('proxy_port'):\n proxy_uri = '{0}:{1}'.format(proxy_uri, proxy.get('proxy_port'))\n if proxy.get('proxy_username') and proxy.get('proxy_password'):\n proxy_uri = '{0}://{1}:{2}@{3}/'.format(proxy['proxy_type'], proxy[\n 'proxy_username'], proxy['proxy_password'], proxy_uri)\n else:\n proxy_uri = '{0}://{1}'.format(proxy['proxy_type'], proxy_uri)\n helper.rest_helper.requests_proxy = {'http': proxy_uri, 'https': proxy_uri}\n",
0 commit comments