分类 微服务 下的文章

(7)学习笔记 ) ASP.NET CORE微服务 Micro-Service ---- 利用Polly+AOP+依赖注入封装的降级框架


创建简单的熔断降级框架
要达到的目标是: 参与降级的方法参数要一样,当HelloAsync执行出错的时候执行HelloFallBackAsync方法。

public class Person
{
  [HystrixCommand("HelloFallBackAsync")]  
  public virtual async Task<string> HelloAsync(string name)
  {
    Console.WriteLine("hello"+name);
    return "ok";
  }
  public async Task<string> HelloFallBackAsync(string name)
  {
    Console.WriteLine("执行失败"+name); 
    return "fail";
  }
}

1、编写 HystrixCommandAttribute

using AspectCore.DynamicProxy;
using System;
using System.Threading.Tasks;
namespace hystrixtest1
{
  //限制这个特性只能标注到方法上
  [AttributeUsage(AttributeTargets.Method)]     
  public class HystrixCommandAttribute : AbstractInterceptorAttribute
  {         
    public HystrixCommandAttribute(string fallBackMethod)
    {             
      this.FallBackMethod = fallBackMethod;
    }          
    public string FallBackMethod { get; set; }         
    public override async Task Invoke(AspectContext context, AspectDelegate next)
    {    
      try             
      {                 
        await next(context);//执行被拦截的方法
      }
      catch (Exception ex)
      {
        //context.ServiceMethod被拦截的方法。context.ServiceMethod.DeclaringType被拦截方法所在的类
        //context.Implementation实际执行的对象p
        //context.Parameters方法参数值
        //如果执行失败,则执行FallBackMethod
        
        //调用降级方法
        //1.调用降级的方法(根据对象获取类,从类获取方法)
        var fallBackMethod = context.ServiceMethod.DeclaringType.GetMethod(this.FallBackMethod);
        //2.调用降级的方法
        Object fallBackResult = fallBackMethod.Invoke(context.Implementation, context.Parameters);
        //3.把降级方法的返回值返回
        context.ReturnValue = fallBackResult;
      }
    }
  }
}

2、编写类

public class Person//需要public类
{
  [HystrixCommand(nameof(HelloFallBackAsync))]        
  public virtual async Task<string> HelloAsync(string name)//需要是虚方法
  {
    Console.WriteLine("hello"+name);
    String s = null;
    // s.ToString(); 
    return "ok";
  }
  public async Task<string> HelloFallBackAsync(string name)
  {
    Console.WriteLine("执行失败"+name);               
    return "fail";
  }
  [HystrixCommand(nameof(AddFall))]         
  public virtual int Add(int i,int j)
  {
    String s = null;    
    //s.ToArray();   
    return i + j;
  }
  public int AddFall(int i, int j)
  {
    return 0;
  }
}

3、创建代理对象

ProxyGeneratorBuilder proxyGeneratorBuilder = new ProxyGeneratorBuilder(); 
using (IProxyGenerator proxyGenerator = proxyGeneratorBuilder.Build())
{
  Person p = proxyGenerator.CreateClassProxy<Person>();
  Console.WriteLine(p.HelloAsync("yzk").Result);
  Console.WriteLine(p.Add(1, 2));
}

上面的代码还支持多次降级,方法上标注[HystrixCommand]并且virtual即可:

public class Person//需要public类
{
  [HystrixCommand(nameof(Hello1FallBackAsync))]
  public virtual async Task<string> HelloAsync(string name)//需要是虚方法
  {
    Console.WriteLine("hello" + name);           
    String s = null;            
    s.ToString();               
    return "ok";
  }
  [HystrixCommand(nameof(Hello2FallBackAsync))]      
  public virtual async Task<string> Hello1FallBackAsync(string name)
  {
    Console.WriteLine("Hello降级1" + name);                  
    String s = null;    
    s.ToString();               
    return "fail_1";
  }
  public virtual async Task<string> Hello2FallBackAsync(string name)
  {
    Console.WriteLine("Hello降级2" + name);
    return "fail_2";
  }
  [HystrixCommand(nameof(AddFall))]         
  public virtual int Add(int i, int j)
  {
    String s = null;
    s.ToString();               
    return i + j;
  }
  public int AddFall(int i, int j)
  {
    return 0;
  }
}

细化框架
上面明白了了原理,然后直接展示写好的更复杂的HystrixCommandAttribute,讲解代码。

这是杨中科老师维护的开源项目

github最新地址 https://github.com/yangzhongke/RuPeng.HystrixCore

Nuget地址:https://www.nuget.org/packages/RuPeng.HystrixCore

重试:MaxRetryTimes表示最多重试几次,如果为0则不重试,RetryIntervalMilliseconds 表示重试间隔的毫秒数;

熔断:EnableCircuitBreaker是否启用熔断,ExceptionsAllowedBeforeBreaking表示熔断前出现允许错误几次,MillisecondsOfBreak表示熔断多长时间(毫秒);

超时:TimeOutMilliseconds执行超过多少毫秒则认为超时(0表示不检测超时)

缓存:CacheTTLMilliseconds 缓存多少毫秒(0 表示不缓存),用“类名+方法名+所有参数值 ToString拼接”做缓存Key(唯一的要求就是参数的类型ToString对于不同对象一定要不一样)。

用到了缓存组件:Install-Package Microsoft.Extensions.Caching.Memory

using System;
using AspectCore.DynamicProxy;
using System.Threading.Tasks;
using Polly;

namespace RuPeng.HystrixCore
{
    [AttributeUsage(AttributeTargets.Method)]
    public class HystrixCommandAttribute : AbstractInterceptorAttribute
    {
        /// <summary> 
        /// 最多重试几次,如果为0则不重试 
        /// </summary> 
        public int MaxRetryTimes { get; set; } = 0;

        /// <summary> 
        /// 重试间隔的毫秒数 
        /// </summary>         
     public int RetryIntervalMilliseconds { get; set; } = 100; 

        /// <summary> 
        /// 是否启用熔断 
        /// </summary>         
     public bool EnableCircuitBreaker { get; set; } = false; 

        /// <summary> 
        /// 熔断前出现允许错误几次 
        /// </summary>         
     public int ExceptionsAllowedBeforeBreaking { get; set; } = 3; 

        /// <summary> 
        /// 熔断多长时间(毫秒) 
        /// </summary>         
     public int MillisecondsOfBreak { get; set; } = 1000; 

        /// <summary> 
        /// 执行超过多少毫秒则认为超时(0表示不检测超时) 
        /// </summary>         
     public int TimeOutMilliseconds { get; set; } = 0; 

        /// <summary> 
        /// 缓存多少毫秒(0表示不缓存),用“类名+方法名+所有参数ToString拼接”做缓存Key 
        /// </summary> 
        public int CacheTTLMilliseconds { get; set; } = 0;

     //由于CircuitBreaker要求同一段代码必须共享同一个Policy对象。
        //而方法上标注的Attribute 对于这个方法来讲就是唯一的对象,一个方法对应一个方法上标注的Attribute对象。
        //一般我们熔断控制是针对一个方法,一个方法无论是通过几个 Person 对象调用,无论是谁调用,只要全局出现ExceptionsAllowedBeforeBreaking次错误,就会熔断,这是框架的实现,你如果认为不合理,自己改去。
        //我们在Attribute上声明一个Policy的成员变量,这样一个方法就对应一个Policy对象。
        private Policy policy;

        private static readonly Microsoft.Extensions.Caching.Memory.IMemoryCache memoryCache = new Microsoft.Extensions.Caching.Memory.MemoryCache(new Microsoft.Extensions.Caching.Memory.MemoryCacheOptions());

        /// <summary> 
        ///  
        /// </summary> 
        /// <param name="fallBackMethod">降级的方法名</param>         
        public HystrixCommandAttribute(string fallBackMethod)
        {
            this.FallBackMethod = fallBackMethod;
        }

        public string FallBackMethod { get; set; }

        public override async Task Invoke(AspectContext context, AspectDelegate next)
        {
            //一个HystrixCommand中保持一个policy对象即可 
            //其实主要是CircuitBreaker要求对于同一段代码要共享一个policy对象 
            //根据反射原理,同一个方法就对应一个HystrixCommandAttribute,无论几次调用, 
            //而不同方法对应不同的HystrixCommandAttribute对象,天然的一个policy对象共享  
       //因为同一个方法共享一个policy,因此这个CircuitBreaker是针对所有请求的。             
       //Attribute也不会在运行时再去改变属性的值,共享同一个policy对象也没问题             
       lock (this)//因为Invoke可能是并发调用,因此要确保policy赋值的线程安全 
            {
                if (policy == null)
                {
                    policy = Policy.NoOpAsync();//创建一个空的Policy                     
            if (EnableCircuitBreaker) //先保证熔断
                    {
                        policy = policy.WrapAsync(Policy.Handle<Exception>().CircuitBreakerAsync(ExceptionsAllowedBeforeBreaking, TimeSpan.FromMilliseconds(MillisecondsOfBreak)));
                    }
                    if (TimeOutMilliseconds > 0) //控制是否超时
                    {
                        policy = policy.WrapAsync(Policy.TimeoutAsync(() => TimeSpan.FromMilliseconds(TimeOutMilliseconds), Polly.Timeout.TimeoutStrategy.Pessimistic));
                    }
                    if (MaxRetryTimes > 0)  //如果出错等待MaxRetryTimes时间在执行
                    {
                        policy = policy.WrapAsync(Policy.Handle<Exception>().WaitAndRetryAsync(MaxRetryTimes, i => TimeSpan.FromMilliseconds(RetryIntervalMilliseconds)));
                    }
                    Policy policyFallBack = Policy
                    .Handle<Exception>()  //出错了报错   如果出错就尝试调用降级方法
                    .FallbackAsync(async (ctx, t) =>
                    {
                        //这里拿到的就是ExecuteAsync(ctx => next(context), pollyCtx);这里传的 pollyCtx 
                        AspectContext aspectContext = (AspectContext)ctx["aspectContext"];
                        var fallBackMethod = context.ServiceMethod.DeclaringType.GetMethod(this.FallBackMethod);
                        Object fallBackResult = fallBackMethod.Invoke(context.Implementation, context.Parameters);
                        //不能如下这样,因为这是闭包相关,如果这样写第二次调用Invoke的时候context指向的 
                        //还是第一次的对象,所以要通过Polly的上下文来传递AspectContext 
                        //context.ReturnValue = fallBackResult;                         
                        aspectContext.ReturnValue = fallBackResult;
                    }, async (ex, t) => { });
                    policy = policyFallBack.WrapAsync(policy);
                }
            }

            //把本地调用的AspectContext传递给Polly,主要给FallbackAsync中使用,避免闭包的坑 
            Context pollyCtx = new Context();//Context是polly中通过Execute给FallBack、Execute等回调方法传上下文对象使用的
            pollyCtx["aspectContext"] = context;//context是aspectCore的上下文 

            //Install-Package Microsoft.Extensions.Caching.Memory             
            if (CacheTTLMilliseconds > 0)
            {
                //用类名+方法名+参数的下划线连接起来作为缓存key 
                string cacheKey = "HystrixMethodCacheManager_Key_" + context.ServiceMethod.DeclaringType + "." + context.ServiceMethod + string.Join("_", context.Parameters);
                //尝试去缓存中获取。如果找到了,则直接用缓存中的值做返回值 
                if (memoryCache.TryGetValue(cacheKey, out var cacheValue))
                {
                    context.ReturnValue = cacheValue;
                }
                else
                {
                    //如果缓存中没有,则执行实际被拦截的方法 
                    await policy.ExecuteAsync(ctx => next(context), pollyCtx);
                    //存入缓存中 
                    using (var cacheEntry = memoryCache.CreateEntry(cacheKey))
                    {
                        cacheEntry.Value = context.ReturnValue;//返回值放入缓存                         
                        cacheEntry.AbsoluteExpiration = DateTime.Now + TimeSpan.FromMilliseconds(CacheTTLMilliseconds);
                    }
                }
            }
            else//如果没有启用缓存,就直接执行业务方法 
            {
                await policy.ExecuteAsync(ctx => next(context), pollyCtx);
            }
        }
    }
}

框架不是万能的,不用过度框架,过度框架带来的复杂度陡增,从人人喜欢变成人人恐惧。

结合 asp.net core依赖注入
在asp.net core项目中,可以借助于asp.net core的依赖注入,简化代理类对象的注入,不用再自己调用ProxyGeneratorBuilder 进行代理类对象的注入了。

Install-Package AspectCore.Extensions.DependencyInjection
修改Startup.cs的ConfigureServices方法,把返回值从void改为IServiceProvider

using AspectCore.Extensions.DependencyInjection;
public IServiceProvider ConfigureServices(IServiceCollection services)
{
    services.AddMvc();
    services.AddSingleton<Person>();
    return services.BuildAspectCoreServiceProvider();
}

其 中 services.AddSingleton<Person>(); 表 示 把Person注 入 。

BuildAspectCoreServiceProvider是让aspectcore接管注入。

在Controller中就可以通过构造函数进行依赖注入了:

public class ValuesController : Controller
{
    private Person p;
    public ValuesController(Person p)
    {
        this.p = p;
    }
}

通过反射扫描所有Service类,只要类中有标记了CustomInterceptorAttribute的方法都算作服务实现类。为了避免一下子扫描所有类,所以 RegisterServices 还是手动指定从哪个程序集中加载。

public IServiceProvider ConfigureServices(IServiceCollection services)
{
    services.AddMvc();
    RegisterServices(this.GetType().Assembly, services); return services.BuildAspectCoreServiceProvider();
}

private static void RegisterServices(Assembly asm, IServiceCollection services)
{
    //遍历程序集中的所有public类型
    foreach (Type type in asm.GetExportedTypes())
    {
        //判断类中是否有标注了CustomInterceptorAttribute的方法
        bool hasCustomInterceptorAttr = type.GetMethods().Any(m => m.GetCustomAttribute(typeof(CustomInterceptorAttribute)) != null);
        if (hasCustomInterceptorAttr)
        {
            services.AddSingleton(type);
        }
    }
} 

注:此文章是我看杨中科老师的.Net Core微服务第二版和.Net Core微服务第二版课件整理出来的


(6)学习笔记 ) ASP.NET CORE微服务 Micro-Service ---- AOP框架


AOP 框架基础
要求懂的知识:AOP、Filter、反射(Attribute)。

如果直接使用 Polly,那么就会造成业务代码中混杂大量的业务无关代码。我们使用 AOP (如果不了解 AOP,请自行参考网上资料)的方式封装一个简单的框架,模仿 Spring cloud 中的 Hystrix。

需要先引入一个支持.Net Core 的 AOP,我们用.Net Core 下的 AOP 框架是AspectCore(国产,动态织入),其他要不就是不支持.Net Core,要不就是不支持对异步方法进行拦截 MVC Filter。

GitHub:https://github.com/dotnetcore/AspectCore-Framework

Install-Package AspectCore.Core -Version 0.5.0

这里只介绍和我们相关的用法:

1、编写拦截器CustomInterceptorAttribute 一般继承自AbstractInterceptorAttribute

public class CustomInterceptorAttribute:AbstractInterceptorAttribute
{
   //每个被拦截的方法中执行
  public async override Task Invoke(AspectContext context, AspectDelegate next)
  {        
    try
    {
      Console.WriteLine("执行之前");                      
      await next(context);//执行被拦截的方法
    }
    catch (Exception)
    {
      Console.WriteLine("被拦截的方法出现异常");                        
      throw;
    }
    finally
    {
      Console.WriteLine("执行之后");
    }
  }
}

2、编写需要被代理拦截的类

在要被拦截的方法上标注CustomInterceptorAttribute 。类需要是public类,方法如果需要拦截就是虚方法,支持异步方法,因为动态代理是动态生成被代理的类的动态子类实现的。

public class Person
{
    [CustomInterceptor]          
    public virtual void Say(string msg)
    {
         Console.WriteLine("service calling..."+msg);
    }
}

3、通过AspectCore创建代理对象

ProxyGeneratorBuilder proxyGeneratorBuilder = new ProxyGeneratorBuilder(); 
using (IProxyGenerator proxyGenerator = proxyGeneratorBuilder.Build())
{
    Person p = proxyGenerator.CreateClassProxy<Person>();
    p.Say("rupeng.com");
}
Console.ReadKey();

注意p指向的对象是AspectCore生成的Person的动态子类的对象,直接new Person是无法被拦截的。

研究AOP细节
拦截器中Invoke方法下的参数AspectContext的属性的含义:

Implementation 实际动态创建的Person子类的对象。

ImplementationMethod就是Person子类的Say方法

Parameters 方法的参数值。

Proxy==Implementation:当前场景下

ProxyMethod==ImplementationMethod:当前场景下

ReturnValue返回值

ServiceMethod是Person的Say方法

注:此文章是我看杨中科老师的.Net Core微服务第二版和.Net Core微服务第二版课件整理出来的


(5)ASP.NET CORE微服务 Micro-Service ---- 熔断降级(Polly)


一、 什么是熔断降级
熔断就是“保险丝”。当出现某些状况时,切断服务,从而防止应用程序不断地尝试执行可能会失败的操作给系统造成“雪崩”,或者大量的超时等待导致系统卡死。

降级的目的是当某个服务提供者发生故障的时候,向调用方返回一个错误响应或者替代响应。举例子:调用联通接口服务器发送短信失败之后,改用移动短信服务器发送,如果移动短信服务器也失败,则改用电信短信服务器,如果还失败,则返回“失败”响应;在从推荐商品服务器加载数据的时候,如果失败,则改用从缓存中加载,如果缓存中也加载失败,则返回一些本地替代数据。

二、 Polly 简介
.Net Core 中有一个被.Net 基金会认可的库 Polly,可以用来简化熔断降级的处理。主要功能:重试(Retry);断路器(Circuit-breaker);超时检测(Timeout);缓存(Cache);降级(FallBack);

官网:https://github.com/App-vNext/Polly

介绍文章:https://www.cnblogs.com/CreateMyself/p/7589397.html

Nuget安装指令:Install-Package Polly -Version 6.0.1

Polly 的策略由“故障”和“动作”两部分组成,“故障”包括异常、超时、返回值错误等情况,“动作”包括 降级(FallBack)、重试(Retry)、熔断(Circuit-breaker)等。

策略用来执行可能会有有故障的业务代码,当业务代码出现“故障”中情况的时候就执行“动作”。

由于实际业务代码中故障情况很难重现出来,所以 Polly 这一些都是用一些无意义的代码模拟出来。

Polly 也支持请求缓存“数据不变化则不重复自行代码”,但是和新版本兼容不好,而且功能局限性很大,因此这里不讲。

由于调试器存在,看不清楚 Polly 的执行过程,因此本节都用【开始执行(不调试)】

三、Polly简单使用
使用Policy的静态方法创建ISyncPolicy实现类对象,创建方法既有同步方法也有异步方法,根据自己的需要选择。下面先演示同步的,异步的用法类似。

举例:当发生ArgumentException异常的时候,执行Fallback代码。

Policy policy = Policy
.Handle<ArgumentException>() //故障
.Fallback(() =>//动作
{
  Console.WriteLine("执行出错");
});
policy.Execute(() => {//在策略中执行业务代码
//这里是可能会产生问题的业务系统代码
Console.WriteLine("开始任务");  throw new ArgumentException("Hello world!");
  Console.WriteLine("完成任务");
});
Console.ReadKey();

如果没有被Handle处理的异常,则会导致未处理异常被抛出。

还可以用Fallback的其他重载获取异常信息:

Policy policy = Policy
.Handle<ArgumentException>() //故障
.Fallback(() =>//动作
{
  Console.WriteLine("执行出错");
},ex=> {
  Console.WriteLine(ex);
});
policy.Execute(() => {
  //在策略中执行业务代码
  //这里是可能会产生问题的业务系统代码
  Console.WriteLine("开始任务1");
  throw new ArgumentException("Hello1 world!");
  Console.WriteLine("完成任务");
});

如果Execute中的代码是带返回值的,那么只要使用带泛型的Policy<T>类即可:

Policy<string> policy = Policy<string>
.Handle<Exception>() //故障
.Fallback(() =>//动作
{
  Console.WriteLine("执行出错");        
  return "降级的值";
});
string value = policy.Execute(() => {
  Console.WriteLine("开始任务");
  throw new Exception("Hello world!");
  Console.WriteLine("完成任务");
  return "正常的值";
});
Console.WriteLine("返回值:"+value);

FallBack的重载方法也非常多,有的异常可以直接提供降级后的值。

()异常中还可以通过lambda表达式对异常判断“满足**条件的异常我才处理”,简单看看试试重载即可。还可以多个Or处理各种不同的异常。

(*)还可以用HandleResult等判断返回值进行故障判断等,我感觉没太大必要。

四、重试处理

Policy policy = Policy
.Handle<Exception>()
.RetryForever(); 
policy.Execute(() => {
    Console.WriteLine("开始任务");        
    if (DateTime.Now.Second % 10 != 0)
    {
        throw new Exception("出错");
    }
    Console.WriteLine("完成任务");
});    

RetryForever()是一直重试直到成功

Retry()是重试最多一次;

Retry(n) 是重试最多n次;

WaitAndRetry()可以实现“如果出错等待100ms再试还不行再等150ms秒。。。。”,重载方法很多,不再一一介绍。还有WaitAndRetryForever。

五、 短路保护 Circuit Breaker
  出现N次连续错误,则把“熔断器”(保险丝)熔断,等待一段时间,等待这段时间内如果再Execute 则直接抛出BrokenCircuitException异常,根本不会再去尝试调用业务代码。等待时间过去之后,再执行Execute的时候如果又错了(一次就够了),那么继续熔断一段时间,否则就恢复正常。

这样就避免一个服务已经不可用了,还是使劲的请求给系统造成更大压力。

Policy policy = Policy
.Handle<Exception>()
.CircuitBreaker(6,TimeSpan.FromSeconds(5));//连续出错6次之后熔断5秒(不会再去尝试执行业务代码)。
 while(true)
{
    Console.WriteLine("开始Execute");
    try
    {
        policy.Execute(() => {                                       
            Console.WriteLine("开始任务");                   
            throw new Exception("出错");                    
            Console.WriteLine("完成任务");
        });
    }
    catch(Exception ex)
    {
        Console.WriteLine("execute出错"+ex);
    }          
    Thread.Sleep(500);
} 

其计数的范围是policy对象,所以如果想整个服务器全局对于一段代码做短路保护,则需要共用一个policy对象。

六、策略封装,包裹Warp
可以把多个ISyncPolicy合并到一起执行:

policy3= policy1.Wrap(policy2);

执行policy3就会把policy1、policy2封装到一起执行。

Policy的静态方法Wrap可以把更多的policy一起封装:

policy9=Policy.Wrap(policy1, policy2, policy3, policy4, policy5);

七、超时处理
这些处理不能简单的链式调用,要用到Wrap。例如下面实现“出现异常则重试三次,如果还出错就FallBack”这样是不行的

Policy policy = Policy.Handle<Exception>().Retry(3).Fallback(()=> { Console.WriteLine("执行出错"); });//这样不行,系统会直接报错

注意Wrap是有包裹顺序的,内层的故障如果没有被处理则会抛出到外层。

下面代码实现了“出现异常则重试三次,如果还出错就FallBack”

Policy policyRetry = Policy.Handle<Exception>().Retry(3); //出现异常重试三次
Policy policyFallback = Policy
.Handle<Exception>()
.Fallback(()=> {
    Console.WriteLine("降级");
});
//Wrap:包裹。policyRetry在里面,policyFallback裹在外面。
//如果里面出现了故障,则把故障抛出来给外面
Policy policy = policyFallback.Wrap(policyRetry);
policy.Execute(()=> {
    Console.WriteLine("开始任务");       
    if (DateTime.Now.Second % 10 != 0)
    {
      throw new Exception("出错");
    }
  Console.WriteLine("完成任务");
});

Timeout是定义超时故障,如果超时会抛出TimeoutRejectedException异常。

Policy policy = Policy.Timeout(3, TimeoutStrategy.Pessimistic);// 创建一个3秒钟(注意单位)的超时策略。

Timeout生成的Policy要和其他Policy一起Wrap使用。

超时策略一般不能直接用,而是和其他封装到一起用:

Policy policy = Policy
.Handle<TimeoutRejectedException>()  //定义所处理的故障
.Fallback(() =>
{
    Console.WriteLine("降级");
});
policy = policy.Wrap(Policy.Timeout(2,TimeoutStrategy.Pessimistic));
policy.Execute(()=> {
    Console.WriteLine("开始任务");
    Thread.Sleep(5000);
    Console.WriteLine("完成任务");
});

上面的代码就是如果执行超过2秒钟,则直接Fallback。 这个的用途:请求网络接口,避免接口长期没有响应造成系统卡死。

八、Polly 的异步用法
所有方法都用Async方法即可,Handle由于只是定义异常,所以不需要异常方法:

带返回值的例子:

Policy<byte[]> policy = Policy<byte[]>
.Handle<Exception>()            
.FallbackAsync(async c => {
    Console.WriteLine("降级");        
    return new byte[0];
},async r=> { 
    Console.WriteLine(r.Exception);
}); 
policy = policy.WrapAsync(
  Policy.TimeoutAsync(2, TimeoutStrategy.Pessimistic, async(context, timespan, task) =>
  {
      Console.WriteLine("timeout");
  })
);
var bytes = await policy.ExecuteAsync(async () => {
    Console.WriteLine("开始任务");
    HttpClient httpClient = new HttpClient();    
    var result = await httpClient.GetByteArrayAsync("http://static.rupeng.com/upload/chatimage/20183/07EB793A4C247A654B31B4D14EC64BCA.png");
    Console.WriteLine("完成任务");        
    return result;
});
Console.WriteLine("bytes长度"+bytes.Length);

没返回值的例子:

Policy policy = Policy
.Handle<Exception>()            
.FallbackAsync(async c => {
    Console.WriteLine("降级");
},async ex=> {//对于没有返回值的,这个参数直接是异常
    Console.WriteLine(ex);
});  
policy = policy.WrapAsync(Policy.TimeoutAsync(3, TimeoutStrategy.Pessimistic, async(context, timespan, task) =>
{
    Console.WriteLine("timeout");
}));
await policy.ExecuteAsync(async () => {
    Console.WriteLine("开始任务");
    await Task.Delay(5000);//注意不能用Thread.Sleep(5000);
    Console.WriteLine("完成任务");
});

注:此文章是我看杨中科老师的.Net Core微服务第二版和.Net Core微服务第二版课件整理出来的


(4)ASP.NET CORE微服务 Micro-Service ---- Consul服务发现和消费


上一章说了 Consul服务注册 现在我要连接上Consul里面的服务 请求它们的API接口 应该怎么做呢?

1.找Consul要一台你需要的服务器

1.1 获取Consul下的所有注册的服务

using (var consulClient = new ConsulClient(c => c.Address = new Uri("http://127.0.0.1:8500"))) 
{ 
   var services = consulClient.Agent.Services().Result.Response;  
   foreach(var service in services.Values) 
     { 
       Console.WriteLine($"id={service.ID},name={service.Service},ip={service.Address},port={service.Port}"); 
     } 
} 

1.2 随机取一个Name为MsgService的服务

下面的代码使用当前 TickCount 进行取模的方式达到随机获取一台服务器实例的效果,这叫做“客户端负载均衡”:

using (var consulClient = new ConsulClient(c => c.Address = new Uri("http://127.0.0.1:8500"))) 
{ 
     var services = consulClient.Agent.Services().Result.Response.Values.Where(s => s.Service.Equals("MsgService", StringComparison.OrdinalIgnoreCase));  
     if(!services.Any()) 
     { 
          Console.WriteLine("找不到服务的实例"); 
     } 
     else 
     { 
          var service = services.ElementAt(Environment.TickCount%services.Count()); 
          Console.WriteLine($"{service.Address}:{service.Port}"); 
     } 
} 

当然在一个毫秒之类会所有请求都压给一台服务器,基本就够用了。也可以自己写随机、轮询等客户端负载均衡算法,也可以自己实现按不同权重分配(注册时候 Tags 带上配置、权重等信息)等算法。

2.请求服务器的接口

你拿到了http地址 难道还不会请求接口么 找个httphelper 直接请求就好了 如果还是不会 就来群里问吧 群号:608188505

给大家上一个 我常用的httphelper 可能被我该的不像样了 不过相信大家都会用 不会的话 来群里找我吧。

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Net.Security;
using System.Runtime.InteropServices;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading.Tasks;

namespace ClientApp
{/// <summary>
 /// Http连接操作帮助类
 /// </summary>
    public class HttpHelper
    {
        private const int ConnectionLimit = 100;
        //编码
        private Encoding _encoding = Encoding.Default;
        //浏览器类型
        private string[] _useragents = new string[]{
            "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36",
            "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.1; WOW64; Trident/7.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0)",
            "Mozilla/5.0 (Windows NT 6.1; rv:36.0) Gecko/20100101 Firefox/36.0",
            "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20130401 Firefox/31.0"
        };

        private String _useragent = "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36";
        //接受类型
        private String _accept = "text/html, application/xhtml+xml, application/xml, */*";
        //超时时间
        private int _timeout = 30 * 1000;
        //类型
        private string _contenttype = "application/x-www-form-urlencoded";
        //cookies
        private String _cookies = "";
        //cookies
        private CookieCollection _cookiecollection;
        //custom heads
        private Dictionary<string, string> _headers = new Dictionary<string, string>();

        public HttpHelper()
        {
            _headers.Clear();
            //随机一个useragent
            _useragent = _useragents[new Random().Next(0, _useragents.Length)];
            //解决性能问题?
            ServicePointManager.DefaultConnectionLimit = ConnectionLimit;
        }

        public void InitCookie()
        {
            _cookies = "";
            _cookiecollection = null;
            _headers.Clear();
        }

        /// <summary>
        /// 设置当前编码
        /// </summary>
        /// <param name="en"></param>
        public void SetEncoding(Encoding en)
        {
            _encoding = en;
        }

        /// <summary>
        /// 设置UserAgent
        /// </summary>
        /// <param name="ua"></param>
        public void SetUserAgent(String ua)
        {
            _useragent = ua;
        }

        public void RandUserAgent()
        {
            _useragent = _useragents[new Random().Next(0, _useragents.Length)];
        }

        public void SetCookiesString(string c)
        {
            _cookies = c;
        }

        /// <summary>
        /// 设置超时时间
        /// </summary>
        /// <param name="sec"></param>
        public void SetTimeOut(int msec)
        {
            _timeout = msec;
        }

        public void SetContentType(String type)
        {
            _contenttype = type;
        }

        public void SetAccept(String accept)
        {
            _accept = accept;
        }

        /// <summary>
        /// 添加自定义头
        /// </summary>
        /// <param name="key"></param>
        /// <param name="ctx"></param>
        public void AddHeader(String key, String ctx)
        {
            //_headers.Add(key,ctx);
            _headers[key] = ctx;
        }

        /// <summary>
        /// 清空自定义头
        /// </summary>
        public void ClearHeader()
        {
            _headers.Clear();
        }

        /// <summary>
        /// 获取HTTP返回的内容
        /// </summary>
        /// <param name="response"></param>
        /// <returns></returns>
        private String GetStringFromResponse(HttpWebResponse response)
        {
            String html = "";
            try
            {
                Stream stream = response.GetResponseStream();
                StreamReader sr = new StreamReader(stream, Encoding.UTF8);
                html = sr.ReadToEnd();

                sr.Close();
                stream.Close();
            }
            catch (Exception e)
            {
                Trace.WriteLine("GetStringFromResponse Error: " + e.Message);
            }

            return html;
        }

        /// <summary>
        /// 检测证书
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="certificate"></param>
        /// <param name="chain"></param>
        /// <param name="errors"></param>
        /// <returns></returns>
        private bool CheckCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors errors)
        {
            return true;
        }

        /// <summary>
        /// 发送GET请求
        /// </summary>
        /// <param name="url"></param>
        /// <returns></returns>
        public String HttpGet(String url)
        {
            return HttpGet(url, url);
        }


        /// <summary>
        /// 发送GET请求
        /// </summary>
        /// <param name="url"></param>
        /// <param name="refer"></param>
        /// <returns></returns>
        public String HttpGet(String url, String refer)
        {
            String html;
            try
            {
                ServicePointManager.ServerCertificateValidationCallback = new System.Net.Security.RemoteCertificateValidationCallback(CheckCertificate);
                HttpWebRequest request = (HttpWebRequest)HttpWebRequest.Create(url);
                request.UserAgent = _useragent;
                request.Timeout = _timeout;
                request.ContentType = _contenttype;
                request.Accept = _accept;
                request.Method = "GET";
                request.Referer = refer;
                request.KeepAlive = true;
                request.AllowAutoRedirect = true;
                request.UnsafeAuthenticatedConnectionSharing = true;
                request.CookieContainer = new CookieContainer();
                //据说能提高性能
                //request.Proxy = null;
                if (_cookiecollection != null)
                {
                    foreach (Cookie c in _cookiecollection)
                    {
                        c.Domain = request.Host;
                    }

                    request.CookieContainer.Add(_cookiecollection);
                }

                foreach (KeyValuePair<String, String> hd in _headers)
                {
                    request.Headers[hd.Key] = hd.Value;
                }

                HttpWebResponse response = (HttpWebResponse)request.GetResponse();
                html = GetStringFromResponse(response);
                if (request.CookieContainer != null)
                {
                    response.Cookies = request.CookieContainer.GetCookies(request.RequestUri);
                }

                if (response.Cookies != null)
                {
                    _cookiecollection = response.Cookies;
                }
                if (response.Headers["Set-Cookie"] != null)
                {
                    string tmpcookie = response.Headers["Set-Cookie"];
                    _cookiecollection.Add(ConvertCookieString(tmpcookie));
                }

                response.Close();
                return html;
            }
            catch (Exception e)
            {
                Trace.WriteLine("HttpGet Error: " + e.Message);
                return String.Empty;
            }
        }

        /// <summary>
        /// 获取MINE文件
        /// </summary>
        /// <param name="url"></param>
        /// <returns></returns>
        public Byte[] HttpGetMine(String url)
        {
            Byte[] mine = null;
            try
            {
                ServicePointManager.ServerCertificateValidationCallback = new System.Net.Security.RemoteCertificateValidationCallback(CheckCertificate);
                HttpWebRequest request = (HttpWebRequest)HttpWebRequest.Create(url);
                request.UserAgent = _useragent;
                request.Timeout = _timeout;
                request.ContentType = _contenttype;
                request.Accept = _accept;
                request.Method = "GET";
                request.Referer = url;
                request.KeepAlive = true;
                request.AllowAutoRedirect = true;
                request.UnsafeAuthenticatedConnectionSharing = true;
                request.CookieContainer = new CookieContainer();
                //据说能提高性能
                request.Proxy = null;
                if (_cookiecollection != null)
                {
                    foreach (Cookie c in _cookiecollection)
                        c.Domain = request.Host;
                    request.CookieContainer.Add(_cookiecollection);
                }

                foreach (KeyValuePair<String, String> hd in _headers)
                {
                    request.Headers[hd.Key] = hd.Value;
                }

                HttpWebResponse response = (HttpWebResponse)request.GetResponse();
                Stream stream = response.GetResponseStream();
                MemoryStream ms = new MemoryStream();

                byte[] b = new byte[1024];
                while (true)
                {
                    int s = stream.Read(b, 0, b.Length);
                    ms.Write(b, 0, s);
                    if (s == 0 || s < b.Length)
                    {
                        break;
                    }
                }
                mine = ms.ToArray();
                ms.Close();

                if (request.CookieContainer != null)
                {
                    response.Cookies = request.CookieContainer.GetCookies(request.RequestUri);
                }

                if (response.Cookies != null)
                {
                    _cookiecollection = response.Cookies;
                }
                if (response.Headers["Set-Cookie"] != null)
                {
                    _cookies = response.Headers["Set-Cookie"];
                }

                stream.Close();
                stream.Dispose();
                response.Close();
                return mine;
            }
            catch (Exception e)
            {
                Trace.WriteLine("HttpGetMine Error: " + e.Message);
                return null;
            }
        }

        /// <summary>
        /// 发送POST请求
        /// </summary>
        /// <param name="url"></param>
        /// <param name="data"></param>
        /// <returns></returns>
        public String HttpPost(String url, String data)
        {
            return HttpPost(url, data, url,null);
        }

        /// <summary>
        /// 发送POST请求
        /// </summary>
        /// <param name="url"></param>
        /// <param name="data"></param>
        /// <param name="refer"></param>
        /// <returns></returns>
        public String HttpPost(String url, String data, String refer,string cookie)
        {
            String html;
            try
            {
                ServicePointManager.ServerCertificateValidationCallback = new System.Net.Security.RemoteCertificateValidationCallback(CheckCertificate);
                HttpWebRequest request = (HttpWebRequest)HttpWebRequest.Create(url);
                request.UserAgent = _useragent;
                request.Timeout = _timeout;
                request.Referer = refer;
                request.ContentType = _contenttype;
                request.Accept = _accept;
                request.Method = "POST";
                request.KeepAlive = true;
                request.AllowAutoRedirect = true;

                request.CookieContainer = new CookieContainer();
                if (!string.IsNullOrEmpty(cookie))
                {
                    _cookiecollection = this.ConvertCookieString(cookie);
                }
                //据说能提高性能
                request.Proxy = null;

                if (_cookiecollection != null)
                {
                    foreach (Cookie c in _cookiecollection)
                    {
                        c.Domain = request.Host;
                        if (c.Domain.IndexOf(':') > 0)
                            c.Domain = c.Domain.Remove(c.Domain.IndexOf(':'));
                    }
                    request.CookieContainer.Add(_cookiecollection);
                }

                foreach (KeyValuePair<String, String> hd in _headers)
                {
                    request.Headers[hd.Key] = hd.Value;
                }
                byte[] buffer = _encoding.GetBytes(data.Trim());
                request.ContentLength = buffer.Length;
                request.GetRequestStream().Write(buffer, 0, buffer.Length);
                request.GetRequestStream().Close();

                HttpWebResponse response = (HttpWebResponse)request.GetResponse();
                html = GetStringFromResponse(response);
                if (request.CookieContainer != null)
                {
                    response.Cookies = request.CookieContainer.GetCookies(request.RequestUri);
                }
                if (response.Cookies != null)
                {
                    _cookiecollection = response.Cookies;
                }
                if (response.Headers["Set-Cookie"] != null)
                {
                    string tmpcookie = response.Headers["Set-Cookie"];
                    _cookiecollection.Add(ConvertCookieString(tmpcookie));
                }

                response.Close();
                return html;
            }
            catch (Exception e)
            {
                Trace.WriteLine("HttpPost Error: " + e.Message);
                return String.Empty;
            }
        }


        public string UrlEncode(string str)
        {
            StringBuilder sb = new StringBuilder();
            byte[] byStr = _encoding.GetBytes(str);
            for (int i = 0; i < byStr.Length; i++)
            {
                sb.Append(@"%" + Convert.ToString(byStr[i], 16));
            }

            return (sb.ToString());
        }

        /// <summary>
        /// 转换cookie字符串到CookieCollection
        /// </summary>
        /// <param name="ck"></param>
        /// <returns></returns>
        private CookieCollection ConvertCookieString(string ck)
        {
            CookieCollection cc = new CookieCollection();
            string[] cookiesarray = ck.Split(";".ToCharArray(), StringSplitOptions.RemoveEmptyEntries);
            for (int i = 0; i < cookiesarray.Length; i++)
            {
                string[] cookiesarray_2 = cookiesarray[i].Split(",".ToCharArray(), StringSplitOptions.RemoveEmptyEntries);
                for (int j = 0; j < cookiesarray_2.Length; j++)
                {
                    string[] cookiesarray_3 = cookiesarray_2[j].Trim().Split("=".ToCharArray());
                    if (cookiesarray_3.Length == 2)
                    {
                        string cname = cookiesarray_3[0].Trim();
                        string cvalue = cookiesarray_3[1].Trim();
                        if (cname.ToLower() != "domain" && cname.ToLower() != "path" && cname.ToLower() != "expires")
                        {
                            Cookie c = new Cookie(cname, cvalue);
                            cc.Add(c);
                        }
                    }
                }
            }

            return cc;
        }


        public void DebugCookies()
        {
            Trace.WriteLine("**********************BEGIN COOKIES*************************");
            foreach (Cookie c in _cookiecollection)
            {
                Trace.WriteLine(c.Name + "=" + c.Value);
                Trace.WriteLine("Path=" + c.Path);
                Trace.WriteLine("Domain=" + c.Domain);
            }
            Trace.WriteLine("**********************END COOKIES*************************");
        }

    }
}

(3)ASP.NET CORE微服务 Micro-Service ---- Consul服务治理


Consul是注册中心,服务提供者、服务提供者、服务消费者等都要注册到Consul中,这样就可以实现服务提供者、服务消费者的隔离。

Consul就是来存储服务器名称与IP和端口对应关系的服务器

一、 consul 服务器安装

consul 下载地址 https://www.consul.io/ 墙外的网站 需要科学上网

cmd运行 consul.exe agent -dev

这是开发环境测试,生产环境要建集群,要至少一台 Server,多台 Agent (因为如果只有一台的话,如果服务死了 整个网站就出问题了)

开发环境中 consul 重启后数据就会丢失。

consul 的监控页面 http://127.0.0.1:850

consult 主要做三件事:提供服务到 ip 地址的注册;提供服务到 ip 地址列表的查询;对提供服务方的健康检查(HealthCheck);

二、 .Net Core 连接 consul

1.Nuget - > Install-Package Consul 安装Consul

2.提供一个HealthCheck API接口 用于Consul做健康检查调用,只要返回个结果就可以 不论是什么结果。如

[Route("api/[controller]")] 
public class HealthController : Controller 
{ 
     [HttpGet] 
     public IActionResult Get() 
     { 
          return Ok("ok"); 
     } 
}

3.服务注册 Consul 及注销

public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime applicationLifetime) 
{ 
     if (env.IsDevelopment()) 
     { 
          app.UseDeveloperExceptionPage(); 
     } 
 
     app.UseMvc(); 
 
     string ip = Configuration["ip"]; 
     int port = Convert.ToInt32(Configuration["port"]); 
 
     string serviceName = "MsgService";
    string serviceId = serviceName + Guid.NewGuid(); 
    using (var client = new ConsulClient(ConsulConfig)) 
     { 
          //注册服务到 Consul    ServiceRegister是一个异步方法
          client.Agent.ServiceRegister(new AgentServiceRegistration() 
          { 
               ID = serviceId,//服务编号,不能重复,用 Guid 最简单 
               Name = serviceName,//服务的名字 
               Address = ip,//服务提供者的能被消费者访问的 ip 地址(可以被其他应用访问的地址,本地测试可以用 127.0.0.1,机房环境中一定要写自己的内网 ip 地址) 
               Port = port,// 服务提供者的能被消费者访问的端口 
               Check = new AgentServiceCheck 
               { 
    DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(5),//服务停止多久后反注册(注销) 
                    Interval = TimeSpan.FromSeconds(10),//健康检查时间间隔,或者称为心跳间隔 
                    HTTP = $"http://{ip}:{port}/api/health",//健康检查地址 
                    Timeout = TimeSpan.FromSeconds(5) 
               } 
          }).Wait();//Consult 客户端的所有方法几乎都是异步方法,但是都没按照规范加上Async 后缀,所以容易误导。记得调用后要 Wait()或者 await 
     } 
 
     //程序正常退出的时候从 Consul 注销服务    
     //要通过方法参数注入 IApplicationLifetime 
   //程序结束的时候会调用这个方法
     applicationLifetime.ApplicationStopped.Register(()=> {
    using (var client = new ConsulClient(ConsulConfig)) 
          { 
               client.Agent.ServiceDeregister(serviceId).Wait(); 
          } 
     }); 
} 
 
private void ConsulConfig(ConsulClientConfiguration c) 
{ 
     c.Address = new Uri("http://127.0.0.1:8500"); 
     c.Datacenter = "dc1"; 
}