"""Module that provides functionality for content manipulation."""fromcopyimportcopyas_copyfromplone.apiimportportalfromplone.api.excimportInvalidParameterErrorfromplone.api.validationimportat_least_one_offromplone.api.validationimportmutually_exclusive_parametersfromplone.api.validationimportrequired_parametersfromplone.app.linkintegrity.exceptionsimportLinkIntegrityNotificationExceptionfromplone.app.uuid.utilsimportuuidToObjectfromplone.uuid.interfacesimportIUUIDfromProducts.CMFCore.DynamicTypeimportDynamicTypefromProducts.CMFCore.WorkflowCoreimportWorkflowExceptionfromzope.componentimportComponentLookupErrorfromzope.componentimportgetMultiAdapterfromzope.componentimportgetSiteManagerfromzope.container.interfacesimportINameChooserfromzope.globalrequestimportgetRequestfromzope.interfaceimportInterfacefromzope.interfaceimportprovidedByimportrandomimporttransaction_marker=[]
[docs]@required_parameters("container","type")@at_least_one_of("id","title")defcreate(container=None,type=None,id=None,title=None,safe_id=False,**kwargs,# NOQA: C816, S101):"""Create a new content item. :param container: [required] Container object in which to create the new object. :type container: Folderish content object :param type: [required] Type of the object. :type type: string :param id: Id of the object. If the id conflicts with another object in the container, a suffix will be added to the new object's id. If no id is provided, automatically generate one from the title. If there is no id or title provided, raise a ValueError. :type id: string :param title: Title of the object. If no title is provided, use id as the title. :type title: string :param safe_id: When False, the given id will be enforced. If the id is conflicting with another object in the target container, raise an InvalidParameterError. When True, choose a new, non-conflicting id. :type safe_id: boolean :returns: Content object :raises: KeyError, :class:`~plone.api.exc.MissingParameterError`, :class:`~plone.api.exc.InvalidParameterError` :Example: :ref:`content-create-example` """# Create a temporary id if the id is not givencontent_id=notsafe_idandidorstr(random.randint(0,99999999))iftitle:kwargs["title"]=titletry:container.invokeFactory(type,content_id,**kwargs)exceptUnicodeDecodeError:# UnicodeDecodeError is a subclass of ValueError,# so will be swallowed below unless we re-raise it hereraiseexceptValueErrorase:types=[fti.getId()forftiincontainer.allowedContentTypes()]raiseInvalidParameterError("Cannot add a '{obj_type}' object with id={obj_id} to the container {container_path}.\n""Allowed types are:\n""{allowed_types}\n""{message}".format(obj_type=type,obj_id=content_id,container_path="/".join(container.getPhysicalPath()),allowed_types="\n".join(sorted(types)),message=str(e),),)content=container[content_id]ifnotidor(safe_idandid):# Create a new id from titlechooser=INameChooser(container)derived_id=idortitlenew_id=chooser.chooseName(derived_id,content)# kacee: we must do a partial commit, else the renaming fails because# the object isn't in the zodb.# Thus if it is not in zodb, there's nothing to move. We should# choose a correct id when# the object is created.# maurits: tests run fine without this though.transaction.savepoint(optimistic=True)content.aq_parent.manage_renameObject(content_id,new_id)returncontent
[docs]@mutually_exclusive_parameters("path","UID")@at_least_one_of("path","UID")defget(path=None,UID=None):"""Get an object. :param path: Path to the object we want to get, relative to the portal root. :type path: string :param UID: UID of the object we want to get. :type UID: string :returns: Content object :raises: ValueError, :Example: :ref:`content-get-example` """ifpath:site=portal.get()site_absolute_path="/".join(site.getPhysicalPath())ifnotpath.startswith(f"{site_absolute_path}"):path="{site_path}{relative_path}".format(site_path=site_absolute_path,relative_path=path,)try:path=path.split("/")iflen(path)>1:parent=site.unrestrictedTraverse(path[:-1])content=parent.restrictedTraverse(path[-1])else:content=site.restrictedTraverse(path[-1])except(KeyError,AttributeError):returnNone# When no object is found don't raise an errorelse:# Only return a content if it implements DynamicType,# which is true for Dexterity content and Comment (plone.app.discussion)returncontentifisinstance(content,DynamicType)elseNoneelifUID:returnuuidToObject(UID)
[docs]@required_parameters("source")@at_least_one_of("target","id")defmove(source=None,target=None,id=None,safe_id=False):"""Move the object to the target container. :param source: [required] Object that we want to move. :type source: Content object :param target: Target container to which the source object will be moved. If no target is specified, the source object's container will be used as a target, effectively making this operation a rename (:ref:`content-rename-example`). :type target: Folderish content object :param id: Pass this parameter if you want to change the id of the moved object on the target location. If the new id conflicts with another object in the target container, a suffix will be added to the moved object's id. :type id: string :param safe_id: When False, the given id will be enforced. If the id is conflicting with another object in the target container, raise a InvalidParameterError. When True, choose a new, non-conflicting id. :type safe_id: boolean :returns: Content object that was moved to the target location :raises: KeyError ValueError :Example: :ref:`content-move-example` """source_id=source.getId()# If no target is given the object is probably renamediftargetandsource.aq_parentisnottarget:target.manage_pasteObjects(source.aq_parent.manage_cutObjects(source_id),)else:target=source.aq_parentifid:returnrename(obj=target[source_id],new_id=id,safe_id=safe_id)else:returntarget[source_id]
[docs]@required_parameters("obj","new_id")defrename(obj=None,new_id=None,safe_id=False):"""Rename the object. :param obj: [required] Object that we want to rename. :type obj: Content object :param new_id: New id of the object. :type new_id: string :param safe_id: When False, the given id will be enforced. If the id is conflicting with another object in the container, raise a InvalidParameterError. When True, choose a new, non-conflicting id. :type safe_id: boolean :returns: Content object that was renamed :Example: :ref:`content-rename-example` """obj_id=obj.getId()container=obj.aq_parentifsafe_id:chooser=INameChooser(container)new_id=chooser.chooseName(new_id,obj)ifobj_id!=new_id:container.manage_renameObject(obj_id,new_id)returncontainer[new_id]
[docs]@required_parameters("source")@at_least_one_of("target","id")defcopy(source=None,target=None,id=None,safe_id=False):"""Copy the object to the target container. :param source: [required] Object that we want to copy. :type source: Content object :param target: Target container to which the source object will be moved. If no target is specified, the source object's container will be used as a target. :type target: Folderish content object :param id: Id of the copied object on the target location. If no id is provided, the copied object will have the same id as the source object - however, if the new object's id conflicts with another object in the target container, a suffix will be added to the new object's id. :type id: string :param safe_id: When False, the given id will be enforced. If the id is conflicting with another object in the target container, raise a InvalidParameterError. When True, choose a new, non-conflicting id. :type safe_id: boolean :returns: Content object that was created in the target location :raises: KeyError, ValueError :Example: :ref:`content-copy-example` """source_id=source.getId()iftargetisNone:target=source.aq_parentcopy_info=target.manage_pasteObjects(source.aq_parent.manage_copyObjects(source_id),)new_id=copy_info[0]["new_id"]ifid:ifnotsafe_idandidintarget:msg="Duplicate ID '{0}' in '{1}' for '{2}'"raiseInvalidParameterError(msg.format(id,target,source))else:returnrename(obj=target[new_id],new_id=id,safe_id=safe_id)else:returntarget[new_id]
[docs]@mutually_exclusive_parameters("obj","objects")@at_least_one_of("obj","objects")defdelete(obj=None,objects=None,check_linkintegrity=True):"""Delete the object(s). :param obj: Object that we want to delete. :type obj: Content object :param objects: Objects that we want to delete. :type objects: List of content objects :param check_linkintegrity: Raise exception if there are linkintegrity-breaches. :type check_linkintegrity: boolean :raises: ValueError plone.app.linkintegrity.exceptions.LinkIntegrityNotificationException :Example: :ref:`content-delete-example` """objects=[obj]ifobjelseobjects# Return early if we have no objects to delete.ifnotobjects:returnifcheck_linkintegrity:site=portal.get()linkintegrity_view=get_view(name="delete_confirmation_info",context=site,request=site.REQUEST,)# look for breaches and manually raise a exceptionbreaches=linkintegrity_view.get_breaches(objects)ifbreaches:raiseLinkIntegrityNotificationException(f"Linkintegrity-breaches: {breaches}",)forobj_inobjects:obj_.aq_parent.manage_delObjects([obj_.getId()])
[docs]@required_parameters("obj")defget_state(obj=None,default=_marker):"""Get the current workflow state of the object. :param obj: [required] Object that we want to get the state for. :type obj: Content object :param default: Returned if no workflow is defined for the object. :returns: Object's current workflow state, or `default`. :rtype: string :raises: Products.CMFCore.WorkflowCore.WorkflowException :Example: :ref:`content-get-state-example` """workflow=portal.get_tool("portal_workflow")ifdefaultisnot_markerandnotworkflow.getWorkflowsFor(obj):returndefault# This still raises WorkflowException when the workflow state is broken,# ie 'review_state' is absentreturnworkflow.getInfoFor(ob=obj,name="review_state")
# work backwards from our end statedef_find_path(maps,path,current_state,start_state):paths=[]# current_state could not be on maps if it only has outgoing# transitions. i.e an initial state you are not able to return to.ifcurrent_statenotinmaps:returnfornew_transition,from_statesinmaps[current_state]:next_path=_copy(path)ifnew_transitioninpath:# Don't go in a circlecontinuenext_path.insert(0,new_transition)ifstart_stateinfrom_states:paths.append(next_path)continueforstateinfrom_states:recursive_paths=_find_path(maps,next_path,state,start_state,)ifrecursive_paths:paths.append(recursive_paths)returnlen(paths)andmin(paths,key=len)orNonedef_wf_transitions_for(workflow,from_state,to_state):"""Get list of transition IDs required to transition. from ``from_state`` to ``to_state``. :param workflow: Workflow object which contains states and transitions :type workflow: Workflow object :param from_state: Current workflow state :type from_state: string :param to_state: Desired workflow state :type to_state: string :returns: A list of transitions :rtype: list """exit_state_maps={}forstateinworkflow.states.objectValues():fortransitioninstate.getTransitions():exit_state_maps.setdefault(transition,[])exit_state_maps[transition].append(state.getId())transition_maps={}fortransitioninworkflow.transitions.objectValues():value=(transition.getId(),exit_state_maps.get(transition.getId(),[]),)iftransition.new_state_idnotintransition_maps:transition_maps[transition.new_state_id]=[value]else:transition_maps[transition.new_state_id].append(value)ifto_statenotintransition_maps:# impossible to reach via this workflowreturnNonereturn_find_path(transition_maps,[],to_state,from_state)def_transition_to(obj,workflow,to_state,**kwargs):# move from the current state to the given state# via any route we can findforwfinworkflow.getWorkflowsFor(obj):status=workflow.getStatusOf(wf.getId(),obj)ifnotstatusornotstatus.get("review_state"):continueifstatus["review_state"]==to_state:returntransitions=_wf_transitions_for(wf,status["review_state"],to_state,)ifnottransitions:continuefortransitionintransitions:try:workflow.doActionFor(obj,transition,**kwargs)exceptWorkflowException:# take into account automatic transitions.# If the transitions list that are being iterated over# have automatic transitions they need to be skippedifget_state(obj)==to_state:breakbreak
[docs]@required_parameters("obj")@at_least_one_of("transition","to_state")@mutually_exclusive_parameters("transition","to_state")deftransition(obj=None,transition=None,to_state=None,**kwargs):"""Perform a workflow transition. for the object or attempt to perform workflow transitions on the object to reach the given state. The later will not guarantee that transition guards conditions can be met. Accepts kwargs to supply to the workflow policy in use, such as "comment" :param obj: [required] Object for which we want to perform the workflow transition. :type obj: Content object :param transition: Name of the workflow transition. :type transition: string :param to_state: Name of the workflow state. :type to_state: string :raises: :class:`~plone.api.exc.MissingParameterError`, :class:`~plone.api.exc.InvalidParameterError` :Example: :ref:`content-transition-example` """workflow=portal.get_tool("portal_workflow")iftransitionisnotNone:try:workflow.doActionFor(obj,transition,**kwargs)exceptWorkflowException:transitions=[action["id"]foractioninworkflow.listActions(object=obj)]raiseInvalidParameterError("Invalid transition '{}'.\n""Valid transitions are:\n""{}".format(transition,"\n".join(sorted(transitions))),)else:_transition_to(obj,workflow,to_state,**kwargs)ifworkflow.getInfoFor(obj,"review_state")!=to_state:raiseInvalidParameterError("Could not find workflow to set state to {} on {}".format(to_state,obj,),)
[docs]@required_parameters("obj")defdisable_roles_acquisition(obj=None):"""Disable acquisition of local roles on given obj. Set __ac_local_roles_block__ = 1 on obj. :param obj: [required] Context object to block the acquisition on. :type obj: Content object :Example: :ref:`content-disable-roles-acquisition-example` """plone_utils=portal.get_tool("plone_utils")plone_utils.acquireLocalRoles(obj,status=0)
[docs]@required_parameters("obj")defenable_roles_acquisition(obj=None):"""Enable acquisition of local roles on given obj. Set __ac_local_roles_block__ = 0 on obj. :param obj: [required] Context object to enable the acquisition on. :type obj: Content object :Example: :ref:`content-enable-roles-acquisition-example` """plone_utils=portal.get_tool("plone_utils")plone_utils.acquireLocalRoles(obj,status=1)
[docs]@required_parameters("name","context")defget_view(name=None,context=None,request=None):"""Get a BrowserView object. :param name: [required] Name of the view. :type name: string :param context: [required] Context on which to get view. :type context: context object :param request: [required] Request on which to get view. :type request: request object :raises: :class:`~plone.api.exc.MissingParameterError`, :class:`~plone.api.exc.InvalidParameterError` :Example: :ref:`content-get-view-example` """ifrequestisNone:request=getRequest()# We do not use exceptionhandling to detect if the requested view is# available, because the __init__ of said view will contain# errors in client code.try:returngetMultiAdapter((context,request),name=name)exceptComponentLookupError:# Getting all available viewssm=getSiteManager()available_views=sm.adapters.lookupAll(required=(providedBy(context),providedBy(request)),provided=Interface,)# Check if the requested view is available# by getting the names of all available viewsavailable_view_names=[view[0]forviewinavailable_views]ifnamenotinavailable_view_names:# Raise an error if the requested view is not available.raiseInvalidParameterError("Cannot find a view with name '{name}'.\n""Available views are:\n""{views}".format(name=name,views="\n".join(sorted(available_view_names)),),)
[docs]@required_parameters("obj")defget_uuid(obj=None):"""Get the object's Universally Unique IDentifier (UUID). :param obj: [required] Object we want its UUID. :type obj: Content object :returns: Object's UUID :rtype: string :raises: ValueError :Example: :ref:`content-get-uuid-example` """returnIUUID(obj)
def_parse_object_provides_query(query):"""Create a query for the object_provides index. :param query: [required] :type query: Single (or list of) Interface or Identifier or a KeywordIndex query for multiple values (eg. `{'query': [Iface1, Iface2], 'operator': 'or'}`) """ifaces=queryoperator="or"query_not=[]ifisinstance(query,dict):ifaces=query.get("query",[])operator=query.get("operator",operator)query_not=query.get("not",[])# KeywordIndex also supports "range",# but that's not useful for querying object_providesifnotisinstance(ifaces,(list,tuple)):ifaces=[ifaces]ifaces=[getattr(x,"__identifier__",x)forxinifaces]ifnotisinstance(query_not,(list,tuple)):query_not=[query_not]query_not=[getattr(x,"__identifier__",x)forxinquery_not]result={}ififaces:result["query"]=ifacesresult["operator"]=operatorifquery_not:result["not"]=query_notreturnresult
[docs]deffind(context=None,depth=None,unrestricted=False,**kwargs):"""Find content in the portal. :param context: Context for the search :type obj: Content object :param depth: How far in the content tree we want to search from context :param unrestricted: Boolean, use unrestrictedSearchResults if True :type obj: Content object :returns: Catalog brains :rtype: List :Example: :ref:`content-find-example` """query={}query.update(**kwargs)# Save the original path to maybe restore it later.orig_path=query.get("path")ifisinstance(orig_path,dict):orig_path=orig_path.get("query")# Passing a context or depth overrides the existing path query,# for now.ifcontextordepthisnotNone:# Make the path a dictionary, unless it already is.ifnotisinstance(orig_path,dict):query["path"]={}# Limit search depthifdepthisnotNone:# If we don't have a context, we'll assume the portal root.ifcontextisNoneandnotorig_path:context=portal.get()else:# Restore the original pathquery["path"]["query"]=orig_pathquery["path"]["depth"]=depthifcontextisnotNone:query["path"]["query"]="/".join(context.getPhysicalPath())# Convert interfaces to their identifiers and also allow to query# multiple values using {'query:[], 'operator':'and|or'}obj_provides=query.get("object_provides",[])ifobj_provides:query["object_provides"]=_parse_object_provides_query(obj_provides)# Make sure we don't dump the whole catalog.catalog=portal.get_tool("portal_catalog")indexes=catalog.indexes()valid_indexes=[indexforindexinqueryifindexinindexes]ifnotvalid_indexes:return[]ifunrestricted:returncatalog.unrestrictedSearchResults(**query)else:returncatalog(**query)