
    BTh                        d dl Z d dlZd dlZd dlZd dlmZ d dlZej                  Zej                  j                  Z	 G d de
      Z G d de
      Zd Zej                  d        Zej                  d	        Zd
efdZddZd Zd ZddZd Zd Zd Zd Zded
efdZded
ej:                  e   fdZdej>                  e   d
ej@                  eej:                  e   f   fdZ!ddZ"d Z#y)    N)defaultdictc                        e Zd Z fdZ xZS )SocketBlockedErrorc                 $    t         |   d       y )Nz"A test tried to use socket.socket.)super__init__)self_args_kwargs	__class__s      _/var/www/catia.catastroantioquia-mas.com/valormas/lib/python3.12/site-packages/pytest_socket.pyr   zSocketBlockedError.__init__   s    =>    __name__
__module____qualname__r   __classcell__r   s   @r   r   r      s    ? ?r   r   c                        e Zd Z fdZ xZS )SocketConnectBlockedErrorc                 X    |rdj                  |      }t        | 	  d| d| d       y )N,z7A test tried to use socket.socket.connect() with host "z" (allowed: "z").)joinr   r   )r	   allowedhostr
   r   r   s        r   r   z"SocketConnectBlockedError.__init__   s8    hhw'G}WIS:	
r   r   r   s   @r   r   r      s    
 
r   r   c                     | j                  d      }|j                  dddd       |j                  dddd	       |j                  d
ddd       |j                  dddd       y )Nsocket--disable-socket
store_truedisable_socketz8Disable socket.socket by default to block network calls.)actiondesthelp--force-enable-socketforce_enable_socketzEForce enable socket.socket network calls (override --disable-socket).--allow-hostsallow_hostsALLOWED_HOSTS_CSVzGOnly allow specified hosts through socket.socket.connect((host, port)).)r"   metavarr#   --allow-unix-socketallow_unix_socketz.Allow calls if they are to Unix domain sockets)getgroup	addoption)parsergroups     r   pytest_addoptionr0      s    OOH%E	OOG	   
OO"T	   
OO#V	   
OO =	  r   c              #   @   K   t        | j                         d yw)z8disable socket.socket for duration of this test functionr+   N)r    __socket_allow_unix_socketpytestconfigs    r   socket_disabledr6   8   s      \%L%LM	s   c              #   (   K   t                d yw)z7enable socket.socket for duration of this test functionN)enable_socketr4   s    r   socket_enabledr9   ?   s      O	s   returnc                 R    	 | t         j                  k(  }|S # t        $ r d}Y |S w xY wNF)r   AF_UNIXAttributeError)familyis_unix_sockets     r   _is_unix_socketrA   F   s9    6>>1   s    &&c                 R      G  fddt         j                         }|t         _         y)zAdisable socket.socket to disable the Internet. useful in testing.c                   (     e Zd ZdZd fd	Z xZS )%disable_socket.<locals>.GuardedSocketz<socket guard to disable socket creation (from pytest-socket)c                 X    t        |      rrt        | 	  | ||||      S t               N)rA   r   __new__r   )clsr?   typeprotofilenor   r+   s        r   rG   z-disable_socket.<locals>.GuardedSocket.__new__U   s.    v&+<wsFD%HH$&&r   )rL   rL   N)r   r   r   __doc__rG   r   )r   r+   s   @r   GuardedSocketrD   R   s    J	' 	'r   rN   N)r   )r+   rN   s   ` r   r    r    O   s    ' ' "FMr   c                  "    t         t        _        y)zBre-enable socket.socket to enable the Internet. useful in testing.N)_true_socketr    r   r   r8   r8   ^   s	     FMr   c                     | j                  dd       | j                  dd       | j                  dd       | j                  d      | _        | j                  d      | _        | j                  d      | _        | j                  d      | _        y )	Nmarkersz@disable_socket(): Disable socket connections for a specific testz>enable_socket(): Enable socket connections for a specific testzIallow_hosts([hosts]): Restrict socket connection to defined list of hostsr$   r   r*   r&   )addinivalue_line	getoption__socket_force_enabled__socket_disabledr3   __socket_allow_hosts)configs    r   pytest_configurerZ   c   s    
U S S %+$4$45L$MF!%//0BCF(.(8(89N(OF%"("2"2?"CFr   c                    t        | d      syd| j                  v s'| j                  d      s| j                  j                  rt                yd| j                  v s| j                  d      r t        | j                  j                         yt        |       }| j                  j                  r#|s t        | j                  j                         yyy)a  During each test item's setup phase,
    choose the behavior based on the configurations supplied.

    This is the bulk of the logic for the plugin.
    As the logic can be extensive, this method is allowed complexity.
    It may be refactored in the future to be more readable.

    If the given item is not a function test (i.e a DoctestItem)
    or otherwise has no support for fixtures, skip it.
    fixturenamesNr9   r8   r6   r    )
hasattrr\   get_closest_markerrY   rV   r8   r    r3   _resolve_allow_hostsrW   )itemhostss     r   pytest_runtest_setuprb   v   s     4(
 	D---""?3;;-- D---1H1H2 	t{{==> !&E {{$$Ut{{==> .3$r   c                     | j                  d      }| j                  j                  }d}|r|j                  d   }n|r|}t	        || j                  j
                         |S )z Resolve `allow_hosts` behaviors.r'   Nr   r2   )r^   rY   rX   argssocket_allow_hostsr3   )r`   mark_restrictionscli_restrictionsra   s       r   r_   r_      s\    //>{{77E!&&q)	 u0V0VWLr   c                      t                y rF   )_remove_restrictionsrQ   r   r   pytest_runtest_teardownrj      s    r   c                 2    | d   }t        |t              r|S y Nr   )
isinstancestr)addressr   s     r   host_from_addressrp      s    1:D$ r   c                 D    | d   }t        |t              rt        |      S y rl   )rm   tuplerp   )rd   ro   s     r   host_from_connect_argsrs      s%    1gG'5! )) "r   ro   c                 N    	 t        j                  |        y# t        $ r Y yw xY w)zC
    Determine if the address is a valid IPv4 or IPv6 address.
    TF)	ipaddress
ip_address
ValueError)ro   s    r   is_ipaddressrx      s*    W% s    	$$hostnamec                     	 t        j                  | d       D ch c]
  ^ }}|d    c}}S c c}}w # t         j                  $ r t               cY S w xY wrl   )r   getaddrinfogaierrorset)ry   _addr_structs      r   resolve_hostnamesr      sU    282D2DXt2T
.;KN
 	
 
 ?? us   4 .4 4 AAallowed_hostsc                     t        t              }| D ]O  }|j                         }t        |      r||   j	                  |       3||   j                  t        |             Q |S )z1Map all items in `allowed_hosts` to IP addresses.)r   r}   striprx   addupdater   )r   ip_hostsr   s      r   normalize_allowed_hostsr      sa     3H ;zz|TNt$TN!!"3D"9:; Or   c                 0   t        | t              r| j                  d      } t        | t              syt	        |       }t        t        j                  |j                                t        |j                               z  t        |j                         D cg c]L  \  }}t        |      dk(  rt        t        |            |k(  r|n| ddj                  t        |             dN c}}      fd}|t         j                   _        yc c}}w )zKdisable socket.socket.connect() to disable the Internet. useful in testing.r   N   z ()c                     t        |      }|v st        | j                        rrt        | g| S t	        |      rF   )rs   rA   r?   _true_connectr   )instrd   r   r+   allowed_ip_hosts_and_hostnamesallowed_lists      r   guarded_connectz+socket_allow_hosts.<locals>.guarded_connect   sB    %d+11DKK(-> ---'d;;r   )rm   rn   splitlistr   r}   	itertoolschainvalueskeyssorteditemslennextiterr   r   connect)r   r+   allowed_ip_hosts_by_hostr   
normalizedr   r   r   s    `    @@r   re   re      s    '3--$gt$6w?%(188:;&$))+,&-"  %=$B$B$D	
 !j z?a'Dj1A,Bd,J vR
); <=Q?@	
	L< ,FMM'	
s   AD
c                  T    t         t        _        t        t        j                  _        y)zKrestore socket.socket.* to allow access to the Internet. useful in testing.N)rP   r   r   r   rQ   r   r   ri   ri     s     FM)FMMr   )F)r:   Nr<   )$ru   r   r   typingcollectionsr   pytestrP   r   r   RuntimeErrorr   r   r0   fixturer6   r9   boolrA   r    r8   rZ   rb   r_   rj   rp   rs   rn   rx   Setr   ListDictr   re   ri   rQ   r   r   <module>r      s       # }}%%? ?

 
8 
 
 
 
t "!
D&$?N
*# $  

3 ;;s#[[fjjo%& ,F*r   