Update policies using API

I wanted to ask if we can update the policies of Nexus IQ Server using APIs? Our current version is 103.

At this time there is no API to modify policies. Are you interested in using an API to make minute adjustments, or is this more for backup/restore, or promoting a full set from a pre-prod test to production?

If it’s the latter, I wrote a small script to batch import and export policies using the undocumented “Import Policies” endpoints.

(Not sure if there’s an easier way, but there wasn’t when I wrote this.)

Source code (click to expand)

This is extracted from a larger script, so there may be a few minor errors. I also haven’t tested it in the last few versions of IQ.

import json
import logging
import re
from typing import Callable
from pathlib import Path
from requests import Response

logger = logging.getLogger(__file__)

class IqConfig:

    def __init__(self, username: str, password: str, url: str = "http://localhost:8070"):
        # Configure the default HTTP client.
        self.url = url
        self.session = Session()
        self.session.auth = (username, password)

    def import_policies(self):
        """
        Import a provided set of configuration.
        https://help.sonatype.com/iqserver/managing/policy-management#PolicyManagement-ReferencePolicySet
        """
        logger.info("[Policies] Importing policies...")

        cwd = Path(__file__).resolve().parent
        policy_file = Path(cwd, "./iq-policies.json")
        with policy_file.open() as file:
            files = {"file": file}
            res = self._rest_api(
                lambda x: x.post(f"{self.url}/rest/policy/organization/ROOT_ORGANIZATION_ID/import", files=files)
            )
        logger.info("Response is: %s", res)

    def export_policies(self):
        # Retrieve values from server.
        policies = self._rest_api(
            lambda x: x.get(f"{self.url}/rest/policy/organization/ROOT_ORGANIZATION_ID/applicable"),
        )
        labels = self._rest_api(
            lambda x: x.get(f"{self.url}/api/v2/labels/organization/ROOT_ORGANIZATION_ID/applicable"),
        )
        # Transform license groups into expected format.
        # Source:
        # { "id": "762ef2e2", "name": "Copyleft", "threatLevel": 7, "licenses": [] }
        licenseGroups = self._rest_api(
            lambda x: x.get(
                f"{self.url}/rest/licenseThreatGroup/organization/ROOT_ORGANIZATION_ID/applicable"
            ),
        )
        licenseGroups = licenseGroups["licenseThreatGroupsByOwner"][0]
        # Destination
        # { "id": "30b8026ddc654c838fde77dab08a3b18", "ownerId": "ROOT_ORGANIZATION_ID", "name": "Banned", "nameLowercaseNoWhitespace": "banned", "threatLevel": 10 }
        groups = []
        licenses = []
        for group in licenseGroups["licenseThreatGroups"]:
            group["ownerId"] = "ROOT_ORGANIZATION_ID"
            group["nameLowercaseNoWhitespace"] = re.sub(
                r"\s+", "", group["name"].lower(), flags=re.UNICODE
            )
            licenses.extend(group["licenses"])
            del group["licenses"]
            groups.append(group)

        # licenses = __fetch(s, lambda x: x.get("/rest/license"))
        categories = self._rest_api(
            lambda x: x.get(
                f"{self.url}/api/v2/applicationCategories/organization/ROOT_ORGANIZATION_ID/applicable"
            ),
        )

        # Format values
        export = {
            "policies": policies["policiesByOwner"][0]["policies"],
            "labels": labels["labelsByOwner"][0]["labels"],
            "licenseThreatGroups": groups,
            "licenseThreatGroupLicenses": licenses,
            "tags": categories["applicationCategoriesByOwner"][0]["applicationCategories"],
            "policyTags": policies["policiesByOwner"][0]["policyTags"],
        }

        # Dump values
        with open("./iq-policies.json", "w") as fp:
            json.dump(export, fp)

    def _rest_api(self, block: Callable[[Session], Response]) -> dict:
        """
        Makes a request against the IQ REST API and includes the CSRF token.
        This is required for the undocumented "/rest/..." API endpoints.
        """
        cookies = self.session.cookies.get_dict()
        if "CLM-CSRF-TOKEN" not in cookies:
            res = self.session.get("/")
            cookies = res.cookies.get_dict()

            if res.status_code != 200:
                raise ValueError(f"Invalid response from server: {res.status_code}")
            elif "CLM-CSRF-TOKEN" not in cookies:
                raise ValueError(f"No CSRF Token returned: {cookies}")

        self.session.headers.update({"X-CSRF-TOKEN": cookies["CLM-CSRF-TOKEN"]})
        res: Response = block(self.session)
        logger.info("Response is (%s): %s", res.status_code, res.text)
        res.raise_for_status()

        try:
            return res.json()
        except:
            return None

Hi @jwhitehouse we want to automate updating of nexus iq server policies via a jenkins pipeline. Hence, was looking out for any API call that can do it.

Also, to write the exact requirement - we would like to have the policies in the bitbucket and we would like to follow a change process whenever we want to update any policy. Once approved, then only Jenkins pipeline will be used to automate the updating of the policies.

Thanks for that use case @nikhil.nagesh.chodankar . This would be to make the config as code. One of the challenges is that ‘coding’ the config of policy is not simple.

I’d consider a way to have the UI for config and export the result, or connect to some storage (file, scm, etc) for change management and a human readable backup.